Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added share.fetch.purgatory.purge.interval.requests

...

Whenever the group epoch is larger than the target assignment epoch, the group coordinator triggers the computation of a new target assignment based on the latest group metadata using a server-side assignor. For a share group, the group coordinator does not persist the assignment. The assignment epoch becomes the group epoch of the group metadata used to compute the assignment.

...

For a share group, the group coordinator persists three seven kinds of records:

  • ShareGroupMetadata - this is written to reserve the group's ID as a share group in the namespace of groups.
  • ShareGroupMemberMetadata - this is written to allow group membership to continue seamlessly across a group coordinator change.
  • ShareGroupPartitionMetadata - this is written to persist the metadata about the partitions the group is assigning to the members.
  • ShareGroupTargetAssignmentMetadata - this is written to persist the assignment epoch.
  • ShareGroupTargetAssignmentMember - this is written to persist the target assignment for a member.
  • ShareGroupCurrentMemberAssignment - this is written to persist the current assignment for a member, and also to maintain the sequence of member epochs.
  • ShareGroupStatePartitionMetadata - this is written whenever the set of topic-partitions being consumed in the share group changes. Its purpose is to keep track of which topic-partitions will have share-group persistent state.

When the group coordinator fails over, the newly elected coordinator replays the state from the __consumer_offsets  partition. This means a share group will remain in existence across the fail-over, along with the list of members . However, the assignments are not persisted and will be recalculated by the new group coordinatorand their assignments. It will bump the group epoch as a result.

...

The group coordinator is responsible for asking the share coordinator to initialize and delete the durable share-partition state. The group coordinator uses the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to keep track of which share-partitions have been initialized.

...

  • When a topic is added to the set of subscribed topics for a share group and is not yet in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record
  • When partitions are added to a topic which is already in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record
  • When KafkaAdmin.alterShareGroupOffsets  is used to reset the SPSO for a share-partition

The state is deleted in the following cases:

  • When a topic in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record is deleted
  • When KafkaAdmin.deleteShareGroupOffsets  is used to delete state for a share-partition, most likely as a result of an administrator cleaning up a topic which is no longer in use by this share group
  • When the share group is deleted

The group coordinator periodically reconciles its "hard" state in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata with the "soft" state in the cluster metadata. This is how it observes relevant changes to topics, such as topic deletion. The ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata contains the set of topic-partitions which are known to be initialized.

...

  • Sends the InitializeShareGroupState  RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic added.

...

  • Sends the InitializeShareGroupState  RPC to the share coordinators for all of the new partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the array of partitions for the topic increased.

...

  • If the topic still exists, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic added to the DeletingTopics  array. This enables group coordinator failover to continue the deletion. If the topic does not exist, the requirement to continue deletion can be inferred by the non-existence of the topic.
  • Sends the DeleteShareGroupState  RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic removed.

...

OperationInvolvesNotes
Create share groupGroup coordinator

This occurs as a side-effect of the initial ShareGroupHeartbeat request.

a) The group coordinator serves the ShareGroupHeartbeat  RPC. It writes a ShareGroupMetadata record and a ShareGroupPartitionMetadata record for the share group, a ShareGroupMemberMetadata record for the member onto the __consumer_offsets  topic. The group has now been created but the share-partitions are still being initialized. The group coordinator responds to the ShareGroupHeartbeat  RPC, but the list of assignments is empty.

b) For each share-partition being initialized, the group coordinator sends an InitializeShareGroupState  to the share coordinator. The SPSO is not known yet and is initialized to -1. The group epoch is used as the state epoch.

c) The share coordinator serves the InitializeShareGroupState  RPC. It writes a ShareSnapshot record to the __share_group_state  topic. When the record is replicated, the share coordinator responds to the RPC.

d) Back in the group coordinator, it writes a ShareGroupPartitionMetadata ShareGroupPartitionStateMetadata record on the __consumer_offsets  topic for the share-partitions which are now initialized. The share-partitions are now able to be included in an assignment in the share group.

Assign a share-partitionGroup coordinator and optionally share coordinatorWhen a topic-partition is eligible to be assigned to a member of a share group for the first time, the group coordinator sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record to the __share_group_state  topic and responds to the group coordinator. The group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record, and the share-partition is now able to be included in an assignment in the share group.
List share groupsGroup coordinator
Describe share groupGroup coordinator
List share group offsetsGroup coordinator and share coordinatorThe admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupStateSummary  request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required.
Alter share group offsetsGroup coordinator and share coordinatorOnly empty share groups support this operation. The group coordinator bumps the group epoch, writes a ShareGroupMetadata, and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record with the new state epoch to the __share_group_state  topic. If the partition was not previously initialized, the group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record.
Delete share group offsetsGroup coordinator and share coordinatorThis is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic adding the topic to the DeletingTopics  array and removing it from the InitializedTopics  array. This enables an interrupted deletion to be completed. The group coordinator sends a DeleteShareGroupState  request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic to complete the deletion of the topic from the share group.
Delete share groupGroup coordinator and share coordinator

Only empty share groups can be deleted. If the share group has any topics with initialized state, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic moving all initialized topics into the DeletingTopics  array, and then it sends a DeleteShareGroupState  request to each of the share coordinators for this share group's partitions, which write tombstones to logically delete the state from the __share_group_state  topic. Then, the group coordinator writes a tombstone ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata and finally a tombstone ShareGroupMetadata record to the __consumer_offsets  topic.

...

  • InvalidRecordStateException  - The record state is invalid. The acknowledgement of delivery could not be completed.
  • ShareSessionNotFoundException  - The share session is not found.
  • InvalidShareSessionEpochException  - The share session epoch is invalid.
  • FencedStateEpochException  - The share coordinator rejected the request because the share-group state epoch did not match.

They are all subclasses of RetriableException .

...

Option

Description

--all-topics

Consider all topics assigned to a group in the `reset-offsets` process.

--bootstrap-server <String: server to connect to>

REQUIRED: The server(s) to connect to.

--command-config <String: command config property file>

Property file containing configs to be passed to Admin Client.

--delete

Pass in groups to delete topic partition offsets over the entire Delete share group. For instance --group g1 --group g2

--delete-offsets

Delete offsets of share group. Supports one share group at the time, and multiple topics.

--describe

Describe share group, members and list offset lag (number of records not yet processed) related to given groupoffset information.

--dry-run

Only show results without executing changes on share groups. Supported operations: reset-offsets.

--execute

Execute operation. Supported operations: reset-offsets.

--group <String: share group>

The share group we wish to act on.

--help

Print usage information.

--list

List all share groups.

--members

Describe members of the group. This option may be used with the '--describe' option only.

--offsets

Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only.

--reset-offsets

Reset offsets of share group. Supports one share group at a time, and instances must be inactive. If neither Has 2 execution options, '--dry-run' nor (the default) and '–execute' is specified,'. Has 3 reset options: '--to-earliest', '--to-latest' and '--to-datetime'. 

--state [String]

When specified with '--describe', includes the state of the group. When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. The valid values are 'Empty', 'Stable' and 'Dead'.

--timeout <Long: timeout (ms)>

The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000). (default: 5000)   

--to-datetime <String: datetime>

Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'.

--to-earliest

Reset offsets to earliest offset.

--to-latest

Reset offsets to latest offset.

--topic <String: topic>

The topic whose share group information should be deleted or included in the reset offset process. In `reset-offsets` caseWhen resetting offsets, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partitions to be included.

--version

Display Kafka version.

...

ConfigurationDescriptionValues
group.coordinator.rebalance.protocols The list of enabled rebalance protocols. (Existing configuration)

"share"  is included in the list of protocols to enable share groups.

This will be added to the default value of this configuration property once this feature is complete.

group.share.delivery.count.limitThe maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
group.share.record.lock.duration.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.min.record.lock.duration.ms Share-group record acquisition lock minimum duration in milliseconds.Default 15000 (15 seconds), minimum 1000 (1 second), maximum 30000 (30 seconds)
group.share.max.record.lock.duration.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 30000 (30 seconds), maximum 3600000 (1 hour)
group.share.partition.max.record.locksShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000
group.share.session.timeout.ms 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds)
group.share.min.session.timeout.ms 

The minimum session timeout.

Default 45000 (45 seconds)
group.share.max.session.timeout.ms 

The maximum session timeout.

Default 60000 (60 seconds)
group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

Default 5000 (5 seconds)
group.share.min.heartbeat.interval.ms 

The minimum heartbeat interval.

Default 5000 (5 seconds)
group.share.max.heartbeat.interval.ms 

The maximum heartbeat interval.

Default 15000 (15 seconds)
group.share.max.groups 

The maximum number of share groups.

Default 10, minimum 1, maximum 100
group.share.max.size 

The maximum number of consumers that a single share group can accommodate.

Default 200, minimum 10, maximum 1000
group.share.assignors 

The server-side assignors as a list of full class names. The list must contain only a single entry which is used by all groups. In the future, it is envisaged that a group configuration will be provided to allow each group to choose one of the list of assignors.

A list of class names, currently limited to a single entry. Default "org.apache.kafka.coordinator.group.share.SimpleAssignor"
share.coordinator.state.topic.num.partitions 

The number of partitions for the share-group state topic (should not change after deployment).

Default 50
share.coordinator.state.topic.replication.factor 

The replication factor for the share-group state topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

Default 3 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
share.coordinator.state.topic.segment.bytes 

The log segment size for the share-group state topic.

Default 104857600
share.coordinator.state.topic.min.isr 

Overridden min.insync.replicas for the share-group state topic.

Default 2 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
share.coordinator.state.topic.compression.codec 

Compression codec for the share-group state topic.

Default 0 (NONE)
share.coordinator.append.linger.ms 

The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk.

Default 10, minimum 0
share.coordinator.load.buffer.size 

Batch size for reading from the share-group state topic when loading state information into the cache (soft-limit, overridden if records are too large).

Default 5 * 1024 * 1024 (5242880), minimum 1
share.coordinator.snapshot.update.records.per.snapshot 

The number of update records the share coordinator writes between snapshot records.

Default 500, minimum 0
share.coordinator.threads 

The number of threads used by the share coordinator.

Default 1, minimum 1
share.coordinator.write.timeout.ms 

The duration in milliseconds that the share coordinator will wait for all replicas of the share-group state topic to receive a write.

Default 5000, minimum 1
share.fetch.purgatory.purge.interval.requests 

The purge interval (in number of requests) of the share fetch request purgatory

Default 1000

Group configuration

The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.

...

  • FindCoordinator  - for finding coordinators, to support share coordinators
  • ListGroups  - for listing groups, to support listing share groups

Access controlAccess control

This table gives the ACLs required for the new APIs.

...

This KIP adds the following error codes the Kafka protocol.

  • INVALID_RECORD_STATE  (121) - The record state is invalid. The acknowledgement of delivery could not be completed.
  • SHARE_SESSION_NOT_FOUND  (122) - The share session is not found.
  • INVALID_SHARE_SESSION_EPOCH  (123) - The share session epoch is invalid.
  • FENCED_STATE_EPOCH  (124) - The share coordinator rejected the request because share-group state epoch did not match.

...

The KIP introduces version 56.

Request schema

Version 5 6 adds the new key type of FindCoordinatorRequest.CoordinatorType.SHARE with value 2 with the key of "group:topicId:partition".

Code Block
{
  "apiKey": 10,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "FindCoordinatorRequest",
  // Version 1 adds KeyType.
  //
  // Version 2 is the same as version 1.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds support for batching via CoordinatorKeys (KIP-699)
  //
  // Version 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
  //
  // Version 6 adds support for share groups (KIP-932).
  "validVersions": "0-56",
  "deprecatedVersions": "0",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "Key", "type": "string", "versions": "0-3",
      "about": "The coordinator key." },
    { "name": "KeyType", "type": "int8", "versions": "1+", "default": "0", "ignorable": false,
      "about": "The coordinator key type. (Group, transaction, share, etc.)" },
    { "name": "CoordinatorKeys", "type": "[]string", "versions": "4+",
      "about": "The coordinator keys." }
  ]
}

Response schema

Version 5 6 is the same as version 45.

ListGroups API

This KIP introduces version 6 

Request schema

ShareGroupHeartbeat API

The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.

Request schema

The member must set all the top-level fields when it joins for the first time or when an error occurs. Otherwise, it is expected only to fill in the fields which have changed since the last heartbeatVersion 6 adds support for share groups.

Code Block
{
  "apiKey": 1676,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListGroupsRequestShareGroupHeartbeatRequest",
  // Version 1 and 2 are the same as version 0.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the StatesFilter field (KIP-518).
  //
  // Version 5 adds the TypesFilter field (KIP-848).
  //
  // Version 6 adds support for share groups.
  "validVersions": "0-6",
  "flexibleVersions": "3+",
  "fields": ["validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member ID generated by the coordinator. The member ID must be kept during the entire lifetime of the member." },
    { "name": "StatesFilterMemberEpoch", "type": "[]stringint32", "versions": "40+",
      "about": "The statescurrent ofmember the groups we wantepoch; 0 to list.join Ifthe empty, all groups are returned with their stategroup; -1 to leave the group." },
    { "name": "TypesFilterRackId", "type": "[]string", "versions": "0+",  "nullableVersions": "50+", "default": "null",
      "about": "Thenull typesif ofnot theprovided groupsor weif wantit to list. If empty, all groups are returned with their typedidn't change since the last heartbeat; the rack ID of consumer otherwise." },
    { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }
  ]
}

Response schema

Version 6 is the same as version 5.

ShareGroupHeartbeat API

The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.

Request schema

The member must set all the top-level fields when it joins for the first time or when an error occurs. Otherwise, it is expected only to fill in the fields which have changed since the last heartbeat.coordinator will only send the Assignment field when it changes.

Code Block
{
  "apiKey": TBD76,
  "type": "request",
  "listeners": ["broker"]response",
  "name": "ShareGroupHeartbeatRequestShareGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },// Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_MEMBER_ID (version 0+)
  // - GROUP_MAX_SIZE_REACHED (version 0+)
  "fields": [
    { "name": "MemberIdThrottleTimeMs", "type": "stringint32", "versions": "0+",
      "about": "The duration memberin IDmilliseconds generatedfor bywhich the coordinator. The member ID must be kept during the entire lifetime of the member request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "MemberEpochErrorCode", "type": "int32int16", "versions": "0+",
      "about": "The top-level currenterror membercode, epoch;or 0 toif jointhere thewas group; -1 to leave the group.no error" },
    { "name": "RackIdErrorMessage", "type": "string", "versions": "0+",  "nullableVersions": "0+", "default": "null",
      "about": "nullThe iftop-level noterror providedmessage, or null if itthere didn'twas change since the last heartbeat; the rack ID of consumer otherwiseno error." },
    { "name": "SubscribedTopicNamesMemberId", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "nullThe member ifID itgenerated didn'tby changethe sincecoordinator. theOnly lastprovided heartbeat;when the subscribed topic names otherwisemember joins with MemberEpoch == 0." },
     ]}
  ]
}

Response schema

The group coordinator will only send the Assignment field when it changes.

Code Block
{
  "apiKey": TBD,
  "type": "response",
 { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "ShareGroupHeartbeatResponseHeartbeatIntervalMs",
  "validVersionstype": "0int32",
  "flexibleVersionsversions": "0+",
    // Supported errors:"about": "The heartbeat interval in milliseconds." },
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_MEMBER_ID (version 0+)
  // - GROUP_MAX_SIZE_REACHED (version 0+)
  { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not provided; the assignment otherwise.", "fields": [
        { "name": "ThrottleTimeMsTopicPartitions", "type": "int32[]TopicPartitions", "versions": "0+",
          "about": "The durationpartitions in milliseconds for whichassigned to the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
member." }
    ]}
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
        { "name": "ErrorCodeTopicId", "type": "int16uuid", "versions": "0+",
          "about": "The top-level error code, or 0 if there was no error" },
  topic ID." },
        { "name": "ErrorMessagePartitions", "type": "string[]int32", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersionspartitions." }
    ]}
  ]
}

ShareGroupDescribe API

The ShareGroupDescribe API is used to describe share groups.

Request schema

Code Block
{
  "apiKey": 77,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareGroupDescribeRequest",
  "validVersions": "0+",
  "defaultflexibleVersions": "null0+",
      "aboutfields": "The member ID generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." },[
    { "name": "MemberEpochGroupIds", "type": "int32[]string", "versions": "0+", "entityType": "groupId",
      "about": "The member epoch. ids of the groups to describe" },
    { "name": "HeartbeatIntervalMsIncludeAuthorizedOperations", "type": "int32bool", "versions": "0+",
      "about": "TheWhether heartbeatto intervalinclude inauthorized millisecondsoperations." },
  ]
}

Response schema

Code Block
{
   { "nameapiKey": "Assignment"77,
  "type": "Assignmentresponse",
  "versionsname": "0+ShareGroupDescribeResponse",
  "nullableVersionsvalidVersions": "0+",
  "defaultflexibleVersions": "null0+",
  // Supported   "about"errors:
 "null if// not provided; the assignment otherwise.", "fields": [- GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - INVALID_GROUP_ID (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  "fields": [
    { "name": "TopicPartitionsThrottleTimeMs", "type": "[]TopicPartitionsint32", "versions": "0+",
          "about": "The duration in partitionsmilliseconds assignedfor towhich the member." }
    ]}
  ],
  "commonStructs": [request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "TopicPartitions[]DescribedGroup", "versions": "0+",
      "about": "Each described group.",
      "fields": [
        { "name": "TopicIdErrorCode", "type": "uuidint16", "versions": "0+",
          "about": "The topic ID describe error, or 0 if there was no error." },
        { "name": "PartitionsErrorMessage", "type": "[]int32string", "versions": "0+",
          "aboutnullableVersions": "The partitions." }
0+", "default": "null",
      ]}
  ]
}

ShareGroupDescribe API

The ShareGroupDescribe API is used to describe share groups.

Request schema

Code Block
{
  "apiKeyabout": NN,
  "type "The top-level error message, or null if there was no error." },
        { "name": "requestGroupId",
  "listenerstype": ["brokerstring"],
  "nameversions": "ShareGroupDescribeRequest0+",
  "validVersionsentityType": "0groupId",
          "flexibleVersionsabout": "0+"The group ID string." },
  "fields": [
      { "name": "GroupIdsGroupState", "type": "[]string", "versions": "0+", "entityType": "groupId",

          "about": "The ids of group state string, or the groups to describeempty string." },
        { "name": "IncludeAuthorizedOperationsGroupEpoch", "type": "boolint32", "versions": "0+",
          "about": "WhetherThe to include authorized operationsgroup epoch." },
    ]
}

Response schema

Code Block
{
  "apiKey": NN,
 { "typename": "responseAssignmentEpoch",
  "nametype": "ShareGroupDescribeResponseint32",
  "validVersionsversions": "0+",
  "flexibleVersions        "about": "0+"The assignment epoch." },
   // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - INVALID_GROUP_ID (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  "fields": [
    { "name": "AssignorName", "type": "string", "versions": "0+",
          "about": "The selected assignor." },
        { "name": "ThrottleTimeMsMembers", "type": "int32[]Member", "versions": "0+",
          "about": "The durationmembers.",
 in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
"fields": [
            { "name": "GroupsMemberId", "type": "[]DescribedGroupstring", "versions": "0+",
              "about": "EachThe describedmember groupID." },
      "fields": [
        { "name": "ErrorCodeRackId", "type": "int16string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The describemember error, or 0 if there was no errorrack ID." },
            { "name": "ErrorMessageMemberEpoch", "type": "stringint32", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
          "about": "The top-levelcurrent error message, or null if there was no errormember epoch." },
            { "name": "GroupIdClientId", "type": "string", "versions": "0+", "entityType": "groupId",

              "about": "The groupclient ID string." },
            { "name": "GroupStateClientHost", "type": "string", "versions": "0+",
              "about": "The groupclient statehost." string}, or
 the empty string." },
        { "name": "GroupEpochSubscribedTopicNames", "type": "int32[]string", "versions": "0+", "entityType": "topicName",
              "about": "The subscribed grouptopic epochnames." },
            { "name": "AssignmentEpochAssignment", "type": "int32Assignment", "versions": "0+",
              "about": "The current assignment epoch." }
          ]},
        { "name": "AssignorNameAuthorizedOperations", "type": "stringint32", "versions": "0+", "default": "-2147483648",
          "about": "The selected assignor32-bit bitfield to represent authorized operations for this group." },
      ]
    }
  ],
 { "namecommonStructs": "Members", [
    { "typename": "[]MemberTopicPartitions", "versions": "0+",
 "fields": [
       { "aboutname": "The members.",
          "fields": [
            { "name": "MemberId""TopicId", "type": "stringuuid", "versions": "0+",
              "about": "The membertopic ID." },
            { "name": "RackIdTopicName", "type": "string", "versions": "0+", "nullableVersionsentityType": "0+topicName", "default": "null",
              "about": "The membertopic rack IDname." },
            { "name": "MemberEpochPartitions", "type": "[]int32", "versions": "0+",
              "about": "The current member epochpartitions." },
        ]},
    { "name": "ClientIdAssignment", "type": "string", "versions": "0+",
              "about"fields": "The client ID." },
  [
          { "name": "ClientHostTopicPartitions", "type": "string[]TopicPartitions", "versions": "0+",
              "about": "The client host assigned topic-partitions to the member." },
            { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName",
              "about": "The subscribed topic names." },
            { "name": "Assignment", "type": "Assignment", "versions": "0+",
              "about": "The current assignment." }
          ]},
        { "name": "AuthorizedOperations", "type": "int32", "versions]}
  ]
}

ShareFetch API

The ShareFetch API is used by share group consumers to fetch acquired records from share-partition leaders. It is also possible to piggyback acknowledgements in this request to reduce the number of round trips.

The first request from a share consumer to a share-partition leader broker establishes a share session by setting MemberId to the member ID it received from the group coordinator and ShareSessionEpoch to 0. Then each subsequent ShareFetch or ShareAcknowledge  request specifies the MemberId  and increments the ShareSessionEpoch  by one. When the share consumer wishes to close the share session, it sets MemberId  to the member ID and ShareSessionEpoch  to -1.

When piggybacking acknowledgements in this request, there are a few special cases.

  • If acknowledgements are being made for a partition and no records should be fetched, PartitionMaxBytes  should be set to zero.
  • If acknowledgements are being made for a partition which is being removed from the share session, the partition is included in the Topics array with PartitionMaxBytes  set to zero AND the partition is included in ForgottenTopicsData .
  • If acknowledgements are being made for a partition in the final request in a share session, the partition is included in the Topics  array and ShareSessionEpoch  is set to -1. No data will be fetched and it is not necessary to include the partition in ForgottenTopicsData .
  • If there's an error which affects all piggybacked acknowledgements but which does not prevent data from being fetched, the AcknowledgeErrorCode  in the response will be set to the same value for all partitions which had piggybacked acknowledgements.

Request schema

For the AcknowledgementBatches of each topic-partition, the FirstOffsets  must be ascending order and the ranges must be non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

Code Block
{
  "apiKey": 78,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "defaultfields": "-2147483648",[
    {     "name": "GroupId", "abouttype": "32-bit bitfield to represent authorized operations for this group." }
      ]
    }
  ],
  "commonStructs": [
    { "name": "TopicPartitions", ""string", "versions": "0+", "fieldsnullableVersions": [
      { "name": "TopicId"0+", "typedefault": "uuidnull", "versionsentityType": "0+groupId",
        "about": "The topicgroup IDidentifier." },
      { "name": "TopicNameMemberId", "type": "string", "versions": "0+", "entityTypenullableVersions": "topicName0+",
        "about": "The topicmember nameID." },
      { "name": "PartitionsShareSessionEpoch", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
    ] current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    {  {"name": "MaxWaitMs", "nametype": "Assignmentint32", "versions": "0+",
      "fieldsabout": [
       "The maximum time in milliseconds to wait for the response." },
    { "name": "TopicPartitionsMinBytes", "type": "[]TopicPartitionsint32", "versions": "0+",
        "about": "The assignedminimum topic-partitionsbytes to accumulate in the memberresponse." },
     ]}
  ]
}

ShareFetch API

The ShareFetch API is used by share group consumers to fetch acquired records from share-partition leaders. It is also possible to piggyback acknowledgements in this request to reduce the number of round trips.

The first request from a share consumer to a share-partition leader broker establishes a share session by setting MemberId to the member ID it received from the group coordinator and ShareSessionEpoch to 0. Then each subsequent ShareFetch or ShareAcknowledge  request specifies the MemberId  and increments the ShareSessionEpoch  by one. When the share consumer wishes to close the share session, it sets MemberId  to the member ID and ShareSessionEpoch  to -1.

When piggybacking acknowledgements in this request, there are a few special cases.

  • If acknowledgements are being made for a partition and no records should be fetched, PartitionMaxBytes  should be set to zero.
  • If acknowledgements are being made for a partition which is being removed from the share session, the partition is included in the Topics array with PartitionMaxBytes  set to zero AND the partition is included in ForgottenTopicsData .
  • If acknowledgements are being made for a partition in the final request in a share session, the partition is included in the Topics  array and ShareSessionEpoch  is set to -1. No data will be fetched and it is not necessary to include the partition in ForgottenTopicsData .
  • If there's an error which affects all piggybacked acknowledgements but which does not prevent data from being fetched, the AcknowledgeErrorCode  in the response will be set to the same value for all partitions which had piggybacked acknowledgements.

Request schema

For the AcknowledgementBatches of each topic-partition, the FirstOffsets  must be ascending order and the ranges must be non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

Code Block
{
  "apiKey": NN,
 { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "TopicId", "type": "requestuuid",
  "listenersversions": ["broker0+"],
  "nameignorable": "ShareFetchRequest"true,
  "validVersionsabout": "0",
The unique "flexibleVersions": "0+"topic ID."},
  "fields": [
    { "name": "GroupIdPartitions", "type": "string[]FetchPartition", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType
        "about": "groupId",
The partitions    to fetch.", "aboutfields": "The[
  group identifier." },
    { "name": "MemberIdPartitionIndex", "type": "stringint32", "versions": "0+", "nullableVersions": "0+",

          "about": "The memberpartition IDindex." },
        { "name": "ShareSessionEpochPartitionMaxBytes", "type": "int32", "versions": "0+",
          "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
          { "name": "MaxWaitMsAcknowledgementBatches", "type": "int32[]AcknowledgementBatch", "versions": "0+",
          "about": "TheRecord batches maximum time in milliseconds to wait for the response." },
to acknowledge.", "fields": [
          { "name": "MinBytesFirstOffset", "type": "int32int64", "versions": "0+",
            "about": "TheFirst minimumoffset bytesof tobatch accumulateof inrecords theto responseacknowledge." },
    { "name": "MaxBytes     { "name": "LastOffset", "type": "int32int64", "versions": "0+",
            "about": "Last "defaultoffset (inclusive) of batch of records to acknowledge."},
          { "name": "AcknowledgeTypes", "type": "0x7fffffff[]int8", "ignorableversions": true"0+",
            "about": "TheArray of maximumacknowledge bytestypes to fetch.  See KIP-74 for cases where this limit may not be honored." - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}
    ]},
    { "name": "TopicsForgottenTopicsData", "type": "[]FetchTopicForgottenTopic", "versions": "0+", "ignorable": false,
      "about": "The topicspartitions to fetch remove from this share session.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]FetchPartitionint32", "versions": "0+",
        "about": "The partitions indexes to fetchforget.", "fields": [
        { "name }
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": 78,
  "type": "PartitionIndexresponse",
  "typename": "int32ShareFetchResponse",
  "versionsvalidVersions": "0+",
          "aboutflexibleVersions": "The partition index." }0+",
  // Supported errors for ErrorCode and { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", AcknowledgeErrorCode:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+) - only for AcknowledgeErrorCode
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - CORRUPT_MESSAGE (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
          { "name": "FirstOffsetThrottleTimeMs", "type": "int64int32", "versions": "0+",
 "ignorable": true,
          "about": "First offset of batch of records to acknowledge."},
          The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "LastOffsetErrorCode", "type": "int64int16", "versions": "0+",
      "ignorable": true,
      "about": "LastThe offset (inclusive) of batch of records to acknowledge."top-level response error code." },
          { "name": "AcknowledgeTypesErrorMessage", "type": "[]int8string", "versions": "0+",
 "nullableVersions": "0+",    "default": "null",
      "about": "ArrayThe oftop-level acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}
    ]}error message, or null if there was no error." },
      { "name": "ForgottenTopicsDataResponses", "type": "[]ForgottenTopicShareFetchableTopicResponse", "versions": "0+", "ignorable": false,
      "about": "The partitions to remove from this share sessionresponse topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]int32PartitionData", "versions": "0+",
        "about": "The topic partitions indexes to forget." }, "fields": [
    ]}
    ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ShareFetchResponseErrorCode",
  "validVersionstype": "0int16",
  "flexibleVersionsversions": "0+",
    // Supported errors for ErrorCode and AcknowledgeErrorCode"about":
 "The //fetch - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+) - only for AcknowledgeErrorCode
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - CORRUPT_MESSAGE (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
error code, or 0 if there was no fetch error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The fetch error message, or null if there was no fetch error." },
        { "name": "AcknowledgeErrorCode", "type": "int16", "versions": "0+",
          "about": "The acknowledge error code, or 0 if there was no acknowledge error." },
        { "name": "ThrottleTimeMsAcknowledgeErrorMessage", "type": "int32string", "versions": "0+", "ignorablenullableVersions": true,
     "0+", "aboutdefault": "null"The,
 duration in milliseconds for which the request was throttled due to a quota violation"about": "The acknowledge error message, or zeronull if thethere requestwas didno not violate any quotaacknowledge error." }, 
        { "name": "ErrorCodeCurrentLeader", "type": "int16LeaderIdAndEpoch", "versions": "0+", "ignorablefields": true,
      "about": "The top-level response error code." },
[
          { "name": "ErrorMessageLeaderId", "type": "stringint32", "versions": "0+", "nullableVersions": "0+", "default": "null",

            "about": "The top-level error message, ID of the current leader or null-1 if therethe wasleader nois errorunknown." },
            { "name": "ResponsesLeaderEpoch", "type": "[]ShareFetchableTopicResponseint32", "versions": "0+",
            "about": "The latest responseknown leader topicsepoch." }
        ]},
 "fields": [
      { "name": "TopicIdRecords", "type": "uuidrecords", "versions": "0+", "ignorablenullableVersions": true"0+", "about": "The uniquerecord topic IDdata."},
        { "name": "PartitionsAcquiredRecords", "type": "[]PartitionDataAcquiredRecords", "versions": "0+",
        "about": "The topicacquired partitionsrecords.", "fields":  [
          { "name": "PartitionIndexFirstOffset", "type":  "int32int64", "versions": "0+",
 "about": "The earliest offset in this batch of  "about": "The partition indexacquired records." },
          { "name": "ErrorCodeLastOffset", "type": "int16int64", "versions": "0+",
          "about": "The fetchlast erroroffset code,of orthis 0batch ifof there was no fetch erroracquired records." },
          { "name": "ErrorMessageDeliveryCount", "type": "stringint16", "versions": "0+", "nullableVersionsabout": "0+", "default": "null",
      The delivery count of this batch of acquired records."}
    "about": "The fetch error message,]}
 or null if there was no]}
 fetch  error." ]},
        { "name": "AcknowledgeErrorCodeNodeEndpoints", "type": "int16[]NodeEndpoint", "versions": "0+",
          "about": "TheEndpoints acknowledgefor errorall code,current orleaders 0enumerated ifin therePartitionData waswith no acknowledge errorerror NOT_LEADER_OR_FOLLOWER." },
 "fields": [
      { "name": "AcknowledgeErrorMessageNodeId", "type": "stringint32", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "aboutmapKey": "The acknowledge error message, or null if there was no acknowledge error." }, 
        { "name": "CurrentLeadertrue, "entityType": "brokerId", "typeabout": "LeaderIdAndEpoch", "versions": "0+", "fields": [The ID of the associated node." },
          { "name": "LeaderIdHost", "type": "int32string", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknownnode's hostname." },
          { "name": "LeaderEpochPort", "type": "int32", "versions": "0+",
            "about": "The latest known leader epochnode's port." }
        ]},
        { "name": "RecordsRack", "type": "recordsstring", "versions": "0+", "nullableVersions": "0+", "aboutdefault": "The record data."}null",
        { "nameabout": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields":  [
          {"name": "FirstOffset", "type":  "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."},
          {The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

ShareAcknowledge API

The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.

Request schema

For the AcknowledgementBatches of each topic-partition, the FirstOffsets  must be ascending order and the ranges must be non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

Code Block
{
  "apiKey": 79,
  "type": "request",
  "listeners": ["broker"],
  "name": "LastOffsetShareAcknowledgeRequest",
  "typevalidVersions": "int640",
  "versionsflexibleVersions": "0+",
  "aboutfields": "The last offset of this batch of acquired records."},[
         { {"name": "DeliveryCountGroupId", "type": "int16string", "versions": "0+", "aboutnullableVersions": "The delivery count of this batch of acquired records."}0+", "default": "null", "entityType": "groupId",
      "about": "The ]}
      ]}
    ]group identifier." },
      { "name": "NodeEndpointsMemberId", "type": "[]NodeEndpointstring", "versions": "0+", "nullableVersions": "0+",
      "about": "EndpointsThe for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
  member ID." },
    { "name": "NodeIdShareSessionEpoch", "type": "int32", "versions": "0+",
        "mapKeyabout": true, "entityType": "brokerId", "about": "The ID of the associated node "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
        { "name": "HostTopics", "type": "string[]AcknowledgeTopic", "versions": "0+",
        "about": "The node's hostname." }, topics containing records to acknowledge.", "fields": [
      { "name": "PortTopicId", "type": "int32uuid", "versions": "0+",
        "about": "The unique node'stopic portID." },
      { "name": "RackPartitions", "type": "string[]AcknowledgePartition", "versions": "0+", "nullableVersions": "0+", "default
        "about": "null",
The partitions containing records to acknowledge.", "fields": [
        { "aboutname": "PartitionIndex"The rack of the node, or null if it has not been assigned to a rack, "type": "int32", "versions": "0+",
          "about": "The partition index." },
    ]}
  ]
}

ShareAcknowledge API

The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.

Request schema

For the AcknowledgementBatches of each topic-partition, the FirstOffsets  must be ascending order and the ranges must be non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

Code Block
{
  "apiKey": NN,
     { "name": "AcknowledgementBatches", "type": "request[]AcknowledgementBatch",
  "listenersversions": ["broker0+"],
      "name": "ShareAcknowledgeRequest",
    "validVersionsabout": "0",
Record batches "flexibleVersions": "0+to acknowledge.",
  "fields": [
          { "name": "GroupIdFirstOffset", "type": "stringint64", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The group identifier." },
    
            "about": "First offset of batch of records to acknowledge."},
          { "name": "MemberIdLastOffset", "type": "stringint64", "versions": "0+", "nullableVersions": "0+",

            "about": "The member ID." Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "ShareSessionEpochAcknowledgeTypes", "type": "int32[]int8", "versions": "0+",
            "about": "TheArray currentof shareacknowledge sessiontypes epoch:- 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
 :Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": 79,
  "type": "response",
      "about": "The topics containing records to acknowledge.", "fields": [
      { "name": "TopicIdShareAcknowledgeResponse",
  "typevalidVersions": "uuid0",
  "versionsflexibleVersions": "0+",
 "about": "The unique topic ID."}, // Supported errors:
  // -   { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
        "about": "The partitions containing records to acknowledge.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
    GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "AcknowledgementBatchesThrottleTimeMs", "type": "[]AcknowledgementBatchint32", "versions": "0+",
 "ignorable": true,
        "about": "Record batches to acknowledge.", "fields": [
       The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "FirstOffsetErrorCode", "type": "int64int16", "versions": "0+",
 "ignorable": true,
          "about": "FirstThe offsettop oflevel batchresponse of records to acknowledgeerror code." },
          { "name": "LastOffsetErrorMessage", "type": "int64string", "versions": "0+",
 "nullableVersions": "0+",    "default": "null",
      "about": "Last offset (inclusive) of batch of records to acknowledge."The top-level error message, or null if there was no error." },
           { "name": "AcknowledgeTypesResponses", "type": "[]int8ShareAcknowledgeTopicResponse", "versions": "0+",
            "about": "ArrayThe of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
  response topics.", "fields": [
    ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKeyname": NN"TopicId",
  "type": "responseuuid",
  "nameversions": "ShareAcknowledgeResponse0+",
  "validVersionsignorable": "0"true,
  "flexibleVersionsabout": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
The unique topic ID."},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ThrottleTimeMsErrorMessage", "type": "int32string", "versions": "0+", "ignorablenullableVersions": true,
      "about"0+", "default": "Thenull",
 duration in milliseconds for which the request was throttled due to a quota violation"about": "The error message, or zeronull if thethere requestwas did not violate any quotano error." },
        { "name": "ErrorCodeCurrentLeader", "type": "int16LeaderIdAndEpoch", "versions": "0+", "ignorablefields": true,[
      "about": "The top level response error code." },
    { "name": "ErrorMessageLeaderId", "type": "stringint32", "versions": "0+", "nullableVersions": "0+", "default": "null",

            "about": "The top-level error message, ID of the current leader or null-1 if therethe wasleader nois errorunknown." },
           { "name": "ResponsesLeaderEpoch", "type": "[]ShareAcknowledgeTopicResponseint32", "versions": "0+",
            "about": "The latest known responseleader topicsepoch.", "fields": [
   }
        ]}
      ]}
    ]},
    { "name": "TopicIdNodeEndpoints", "type": "uuid[]NodeEndpoint", "versions": "0+", "ignorable": true,
      "about": "The unique topic ID."},Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "PartitionsNodeId", "type": "[]PartitionDataint32", "versions": "0+",
        "aboutmapKey": true, "entityType"The topic partitions.: "brokerId", "fieldsabout": [
"The ID of the associated node." },
      { "name": "PartitionIndexHost", "type": "int32string", "versions": "0+",
          "about": "The partitionnode's indexhostname." },
        { "name": "ErrorCodePort", "type": "int16int32", "versions": "0+",
          "about": "The error code, or 0 if there was no errornode's port." },
        { "name": "ErrorMessageRack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error messagerack of the node, or null if there was no error it has not been assigned to a rack." },
         { "name]}
  ]
}

AlterShareGroupOffsets API

The AlterShareGroupOffsets API is used to alter the share-partition start offsets for the share-partitions in a share group. The group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "CurrentLeaderrequest",
  "listeners": "type["broker"],
  "name": "LeaderIdAndEpochAlterShareGroupOffsetsRequest",
  "validVersions": "versions"0",
  "flexibleVersions": "0+",
  "fields": [
          { "{ "name": "LeaderIdGroupId", "type": "int32string", "versions": "0+",
            "entityType": "groupId",
      "about": "The ID of the current leader or -1 if the leader is unknowngroup identifier." },
          { "name": "LeaderEpochTopics", "type": "int32[]AlterShareGroupOffsetsRequestTopic", "versions": "0+",
            "about": "The topics latestto knownalter leaderoffsets epochfor.", }
        ]}
      ]}
    ]},
"fields": [
      { "name": "NodeEndpointsTopicName", "type": "[]NodeEndpointstring", "versions": "0+",
      "aboutentityType": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [topicName", "mapKey": true,
        "about": "The topic name." },
      { "name": "NodeIdPartitions", "type": "int32[]AlterShareGroupOffsetsRequestPartition", "versions": "0+",
        "mapKeyabout": true, "entityType": "brokerId" "Each partition to alter offsets for.", "aboutfields": "The[
 ID of the associated node." },
      { "name": "HostPartitionIndex", "type": "stringint32", "versions": "0+",
          "about": "The node'spartition hostnameindex." },
              { "name": "PortStartOffset", "type": "int32int64", "versions": "0+",
                  "about": "The node's portshare-partition start offset." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." ]}
    ]}
   ]
}

AlterShareGroupOffsets API

The AlterShareGroupOffsets API is used to alter the share-partition start offsets for the share-partitions in a share group. The group coordinator serves this API.

...

Response schema

Code Block
{
    "apiKey": NN,
    "type": "requestresponse",
  "listeners": ["broker"],
  "  "name": "AlterShareGroupOffsetsRequestAlterShareGroupOffsetsResponse",
    "validVersions": "0",
    "flexibleVersions": "0+",
  // Supported "fields"errors: [
  // - { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]AlterShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to alter offsets for.",  GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
      { "name": "TopicNameThrottleTimeMs", "type": "stringint32", "versions": "0+", "entityTypeignorable": "topicName", "mapKey": true,
        "about": "The topic name duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "PartitionsResponses", "type": "[]AlterShareGroupOffsetsRequestPartitionAlterShareGroupOffsetsResponseTopic", "versions": "0+",
        "about": "EachThe partitionresults tofor altereach offsets fortopic.", "fields": [
        { "name": "PartitionIndexTopicName", "type": "int32string", "versions": "0+",
 "entityType": "topicName",
        "about": "The partitiontopic indexname." },
              { "name": "StartOffsetTopicId", "type": "int64uuid", "versions": "0+",
           "ignorable": true,
        "about": "The share-partitionunique starttopic offsetID." },
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  { "name": "Partitions", "type": "[]AlterShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "name": "AlterShareGroupOffsetsResponsePartitionIndex",
  "validVersionstype": "0int32",
  "flexibleVersionsversions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The durationerror inmessage, millisecondsor fornull whichif the requestthere was throttledno due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type":error." }
      ]}
    ]}
  ]
}

DeleteShareGroupOffsets API

The DeleteShareGroupOffsets API is used to delete the offsets for the share-partitions in a share group. The group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteShareGroupOffsetsRequest",
  "validVersions "[]AlterShareGroupOffsetsResponseTopic", "versions": "0+",
      "about  "flexibleVersions": "The results for each topic.0+", 
  "fields": [
      { "name": "TopicNameGroupId", "type": "string", "versions": "0+", "entityType": "topicNamegroupId",
              "about": "The topicgroup nameidentifier." },
      { "name": "TopicIdTopics", "type": "uuid[]DeleteShareGroupOffsetsRequestTopic", "versions": "0+", "ignorable": true,
        "about": "The uniquetopics topicto ID." },
      { "name": "Partitions", "type": "[]AlterShareGroupOffsetsResponsePartition", "versions": "0+", "delete offsets for.",  "fields": [
        { "name": "PartitionIndexTopicName", "type": "int32string", "versions": "0+",
 "entityType": "topicName",
        "about": "The partitiontopic indexname." },
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessageDeleteShareGroupOffsetsResponse", "type": "string",
  "versionsvalidVersions": "0+",
  "nullableVersionsflexibleVersions": "0+", "ignorable": true, "default": "null",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  //  "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

DeleteShareGroupOffsets API

The DeleteShareGroupOffsets API is used to delete the offsets for the share-partitions in a share group. The group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "GroupIdThrottleTimeMs", "type": "stringint32", "versions": "0+", "entityTypeignorable": "groupId"true,
            "about": "The group identifierduration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "TopicsResponses", "type": "[]DeleteShareGroupOffsetsRequestTopicDeleteShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The topicsresults tofor deleteeach offsets fortopic.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
 { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." }
      { "name": "DeleteShareGroupOffsetsResponseErrorCode",
  "validVersionstype": "0int16",
  "flexibleVersionsversions": "0+",
      // Supported errors"about":
 "The //error - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  code, or 0 if there was no error." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
        "about": "The error message, or null if there was no error." }
    ]}
  ]
}

DescribeShareGroupOffsets API

The DescribeShareGroupOffsets API is used to describe the offsets for the share-partitions in a share group. The group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DescribeShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMsGroupId", "type": "int32string", "versions": "0+", "ignorableentityType": true"groupId",
            "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quotagroup identifier." },
    { "name": "ResponsesTopics", "type": "[]DeleteShareGroupOffsetsResponseTopicDescribeShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics resultsto fordescribe eachoffsets topicfor.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicIdPartitions", "type": "uuid[]int32", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." partitions." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ErrorCodeDescribeShareGroupOffsetsResponse",
  "typevalidVersions": "int160",
  "versionsflexibleVersions": "0+",
  // Supported errors:
  //  "about": "The error code, or 0 if there was no error." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions- GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "default": "null",
        "about": "The duration errorin message,milliseconds orfor nullwhich ifthe thererequest was nothrottled error." }
    ]}
  ]
}

DescribeShareGroupOffsets API

The DescribeShareGroupOffsets API is used to describe the offsets for the share-partitions in a share group. The group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DescribeShareGroupOffsetsRequest",
  "validVersionsdue to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DescribeShareGroupOffsetsResponseTopic", "versions": "0+",
  "flexibleVersions      "about": "0+The results for each topic.",
   "fields": [
      { "name": "GroupIdTopicName", "type": "string", "versions": "0+", "entityType": "groupIdtopicName",
              "about": "The grouptopic identifiername." },
      { "name": "TopicsTopicId", "type": "[]DescribeShareGroupOffsetsRequestTopicuuid", "versions": "0+", "ignorable": true,
        "about": "The topics to describe offsets for.",  " unique topic ID." },
      { "name": "Partitions", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "name": "TopicNamePartitionIndex", "type": "stringint32", "versions": "0+",
 "entityType": "topicName",
        "about": "The topicpartition nameindex." },
        { "name": "PartitionsStartOffset", "type": "[]int32int64", "versions": "0+",
          "about": "The partitions share-partition start offset." },
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DescribeShareGroupOffsetsResponseErrorCode",
  "validVersionstype": "0int16",
  "flexibleVersionsversions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The durationerror inmessage, millisecondsor fornull whichif the requestthere was throttledno due to a quota violation, or zero if the error." }
      ]}
    ]}
  ]
}

InitializeShareGroupStateAPI

The InitializeShareGroupState API is used by the group coordinator to initialize the share-partition state. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "InitializeShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
 request did not violate any quota." },
    { "name": "Responses", "type": "[]DescribeShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicNameGroupId", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topicgroup nameidentifier." },
      { "name": "TopicIdTopics", "type": "uuid[]InitializeStateData", "versions": "0+", "ignorable": true,
        "about": "The data uniquefor topicthe IDtopics." },, "fields": [
      { "name": "PartitionsTopicId", "type": "[]DescribeShareGroupOffsetsResponsePartitionuuid", "versions": "0+",
        "fieldsabout": "The [
topic identifier." },
      { "name": "PartitionIndexPartitions", "type": "int32[]PartitionData", "versions": "0+",
          "about": "The partition indexdata for the partitions." },, "fields": [
        { "name": "StartOffsetPartition", "type": "int64int32", "versions": "0+",
          "about": "The share-partition start offsetindex." },
        { "name": "ErrorCodeStateEpoch", "type": "int16int32", "versions": "0+",
          "about": "The errorstate code,epoch orof 0 if there was no errorthe share-partition." },
        { "name": "ErrorMessageStartOffset", "type": "stringint64", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The share-partition errorstart messageoffset, or null-1 if there was no error the start offset is not being initialized." }
      ]}
    ]}
  ]
}

InitializeShareGroupStateAPI

The InitializeShareGroupState API is used by the group coordinator to initialize the share-partition state. This is an inter-broker RPC authorized as a cluster action.

...

Response schema

Code Block
{
  "apiKey": NN,
  "type": "requestresponse",
  "listenersname": ["brokerInitializeShareGroupStateResponse"],
  "namevalidVersions": "InitializeShareGroupStateRequest0",
  "validVersionsflexibleVersions": "0+",
  // -  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "TopicsResults", "type": "[]InitializeStateDataInitializeStateResult", "versions": "0+",
      "about": "The data for the topics.initialization results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionDataPartitionResult", "versions": "0+",
        "about":  "The dataresults for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StateEpochErrorCode", "type": "int32int16", "versions": "0+",
          "about": "The stateerror epochcode, ofor the share-partition0 if there was no error." },
        { "name": "StartOffsetErrorMessage", "type": "int64string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The share-partition start offseterror message, or -1null if thethere startwas offset is not being initializedno error." }
        ]}
    ]}
  ]
}

...

ReadShareGroupState API

The ReadShareGroupState API is used by share-partition leaders to read share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["responsebroker"],
  "name": "InitializeShareGroupStateResponseReadShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  "fields": [
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [{ "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "ResultsTopics", "type": "[]InitializeStateResultReadStateData", "versions": "0+",
      "about": "The data for initializationthe resultstopics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionResultPartitionData", "versions": "0+",
        "about" : "The resultsdata for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCodeLeaderEpoch", "type": "int16int32", "versions": "0+",
          "about":, "The errorleader code,epoch orof 0 if there was no errorthe share-partition." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

ReadShareGroupState API

The ReadShareGroupState API is used by share-partition leaders to read share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

...

Response schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"]response",
  "name": "ReadShareGroupStateRequestReadShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
   "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about":"The group identifier." },// - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "TopicsResults", "type": "[]ReadStateDataReadStateResult", "versions": "0+",
      "about": "The data for the topics.read results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionDataPartitionResult", "versions": "0+",
        "about":  "The dataresults for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "LeaderEpochErrorCode", "type": "int32int16", "versions": "0+",
          "about",: "The leader epoch of the share-partition error code, or 0 if there was no error." },
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKeyname": NN"ErrorMessage",
  "type": "responsestring",
  "nameversions": "ReadShareGroupStateResponse0+",
  "validVersionsnullableVersions": "0+",
  "flexibleVersionsdefault": "0+null",
   // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    "about": "The error message, or null if there was no error." }
        { "name": "ResultsStateEpoch", "type": "[]ReadStateResultint32", "versions": "0+",
          "about": "The read results", "fields": [
 state epoch of the share-partition." },
        { "name": "TopicIdStartOffset", "type": "uuidint64", "versions": "0+",
          "about": "The topic identifier share-partition start offset, which can be -1 if it is not yet initialized." },
        { "name": "PartitionsStateBatches", "type": "[]PartitionResultStateBatch", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
          { "name": "PartitionFirstOffset", "type": "int32int64", "versions": "0+",
            "about": "The partition indexfirst offset of this state batch." },
          { "name": "ErrorCodeLastOffset", "type": "int16int64", "versions": "0+",
            "about": "The errorlast code,offset orof 0this if there was no errorstate batch." },
          { "name": "ErrorMessageDeliveryState", "type": "stringint8", "versions": "0+",
            "nullableVersionsabout": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
       The delivery state - 0:Available,2:Acked,4:Archived." },
          { "name": "StateEpochDeliveryCount", "type": "int32int16", "versions": "0+",
            "about": "The state epoch of the share-partitiondelivery count." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",]}
      ]}
    ]}
  ]
}

WriteShareGroupState API

The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
       "about": "The share-partition start offset, which can be -1 if it is not yet initialized." },
        { "name": "StateBatchesWriteShareGroupStateRequest",
  "typevalidVersions": "[]StateBatch0",
  "versionsflexibleVersions": "0+",
  "fields": [
          { "name": "FirstOffsetGroupId", "type": "int64string", "versions": "0+",
            "about": "The firstgroup offset of this state batch." },
     identifier." },
     { "name": "LastOffsetTopics", "type": "int64[]WriteStateData", "versions": "0+",
            "about": "The lastdata offsetfor of this state batchthe topics." },
 "fields": [
        { "name": "DeliveryStateTopicId", "type": "int8uuid", "versions": "0+",
            "about": "The delivery state - 0:Available,2:Acked,4:Archivedtopic identifier." },
          { "name": "DeliveryCountPartitions", "type": "int16[]PartitionData", "versions": "0+",
            "about": "The data for deliverythe countpartitions." }
        ]}
      ]}
, "fields": [
      ]}
  ]
}

WriteShareGroupState API

The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKeyname": NN"Partition",
  "type": "requestint32",
  "listenersversions": ["broker0+"],
    "name": "WriteShareGroupStateRequest",
      "validVersionsabout": "0",
The partition "flexibleVersionsindex.": "0+"},
   "fields": [
    { "name": "GroupIdStateEpoch", "type": "stringint32", "versions": "0+",
          "about": "The group identifier state epoch of the share-partition." },
        { "name": "TopicsLeaderEpoch", "type": "[]WriteStateDataint32", "versions": "0+",
          "about": "The dataleader epoch forof the topicsshare-partition." },
 "fields": [
      { "name": "TopicIdStartOffset", "type": "uuidint64", "versions": "0+",
          "about": "The topic identifiershare-partition start offset, or -1 if the start offset is not being written." },
        { "name": "PartitionsStateBatches", "type": "[]PartitionDataStateBatch", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
          { "name": "PartitionFirstOffset", "type": "int32int64", "versions": "0+",
            "about": "The partition index first offset of this state batch." },
          { "name": "StateEpochLastOffset", "type": "int32int64", "versions": "0+",
            "about": "The statelast epochoffset of this thestate share-partitionbatch." },
          { "name": "LeaderEpochDeliveryState", "type": "int32int8", "versions": "0+",
            "about": "The leaderdelivery epochstate of the share-partition.- 0:Available,2:Acked,4:Archived" },
          { "name": "StartOffsetDeliveryCount", "type": "int64int16", "versions": "0+",
            "about": "The share-partition start offset, or -1 if the start offset is not being written." }, delivery count." }
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
   {"apiKey": NN,
  "nametype": "StateBatchesresponse",
  "typename": "[]StateBatchWriteShareGroupStateResponse",
  "versionsvalidVersions": "0+",
  "fieldsflexibleVersions": ["0+",
  // - NOT_COORDINATOR (version 0+)  
  //  { "name": "FirstOffset", "type": "int64", "versions": "0+",
            "about": "The first offset of this state batch." },
          - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "LastOffsetResults", "type": "int64[]WriteStateResult", "versions": "0+",
            "about": "The lastwrite offset of this state batch." },results", "fields": [
          { "name": "DeliveryStateTopicId", "type": "int8uuid", "versions": "0+",
            "about": "The delivery state - 0:Available,2:Acked,4:Archivedtopic identifier" },
          { "name": "DeliveryCountPartitions", "type": "int16[]PartitionResult", "versions": "0+",
            "about": "The results for deliverythe countpartitions.", "fields": }[
         ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "{ "name": "WriteShareGroupStateResponsePartition",
  "validVersionstype": "0int32",
  "flexibleVersionsversions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
"about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ResultsErrorMessage", "type": "[]WriteStateResultstring", "versions": "0+",
      "aboutnullableVersions": "0+"The write results, "default": "null",
 "fields": [         "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

DeleteShareGroupState API

The DeleteShareGroupState API is used by the group coordinator to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "nameapiKey": "TopicId"NN,
  "type": "uuidrequest",
  "versionslisteners": ["0+broker"],
  "name": "DeleteShareGroupStateRequest",
  "validVersions": "0",
  "aboutflexibleVersions": "0+"The,
 topic identifier"fields": },[
      { "name": "PartitionsGroupId", "type": "[]PartitionResultstring", "versions": "0+",
        "about" : "The results for the partitionsgroup identifier.", "fields": [
     },
    { "name": "PartitionTopics", "type": "int32[]DeleteStateData", "versions": "0+",
          "about": "The partition indexdata for the topics." },
 "fields": [
      { "name": "ErrorCodeTopicId", "type": "int16uuid", "versions": "0+",
          "about": "The error code, or 0 if there was no errortopic identifier." },
        { "name": "ErrorMessagePartitions", "type": "string[]PartitionData", "versions": "0+",
        "nullableVersionsabout": "0+The data for the partitions.", "defaultfields": "null",[
        { "name": "Partition", "abouttype": "int32"The error message, or null if there was no error, "versions": "0+",
          "about": "The partition index." }
        ]}
    ]}
   ]
}

DeleteShareGroupState API

The DeleteShareGroupState API is used by the group coordinator to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

...

Response schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"]response",
  "name": "DeleteShareGroupStateRequestDeleteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "Topics", "type": "[]DeleteStateData", "versions": "0+",
      "// - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]DeleteStateResult", "versions": "0+",
      "about": "The data for the topics.delete results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionDataPartitionResult", "versions": "0+",
        "about":  "The dataresults for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
      ]}
  {  ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "DeleteShareGroupStateResponseErrorMessage",
 "type": "validVersions"string", "versions": "0+",
  "flexibleVersionsnullableVersions": "0+", "default": "null",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]DeleteStateResult", "versions": "0+",
      "about": "The delete results", "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

ReadShareGroupStateSummary API

The ReadShareGroupStateSummary API is used by the group coordinator to read a summary of the share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupStateSummaryRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]ReadStateSummaryData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The leader epoch of the share-partition." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupStateSummaryResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadStateSummaryResult", "versions": "0+",
      "about": "The read results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about": "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch of the share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset." }
      ]}
    ]}
  ]
}

Records

This section describes the new record types.

Group and assignment metadata

In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets  topic.

With the exception of the ShareGroupStatePartitionMetadata record, all of the record types are analogous to those introduced in KIP-848 for consumer groups.

For each share group, a ShareGroupMetadata record is written for the group epoch, and a ShareGroupPartitionMetadata record is written for the topic-partitions being assigned by the group. When the group is deleted, a tombstone records are written.

For each member, there is a ShareGroupMemberMetadata record which enables group membership to continue seamlessly across a group coordinator change. When the member leaves, a tombstone record is written.

For each share group, a ShareGroupTargetAssignmentMetadata record is written to record the group epoch used to compute the assignment. For each member, there is a ShareGroupTargetAssignmentMember record which persists the target assignment, and a ShareGroupCurrentMemberAssignment record which persists the current assignment and is also used to keep track of the member epoch.

There is also a ShareGroupStatePartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.

ShareGroupMetadataKey

Code Block
{
  "type": "data",
  "name": "ShareGroupMetadataKey",
  "validVersions": "11",
  "flexibleVersions": "none",
  "fields": [
      { "name": "TopicIdGroupId", "type": "uuidstring", "versions": "0+11",
        "about": "The topicgroup identifierid." },
   ]
}

ShareGroupMetadataValue

Code Block
{
   { "nametype": "Partitionsdata",
  "typename": "[]PartitionResultShareGroupMetadataValue",
  "versionsvalidVersions": "0+",
        "aboutflexibleVersions" : "The results for the partitions.0+",
  "fields": [
        { "name": "PartitionEpoch", "type": "int32", "versions": "0+",
          "about": "The partitiongroup indexepoch." },
  ]
}

ShareGroupPartitionMetadataKey

Code Block
{
      { "name"type": "ErrorCodedata",
  "typename": "int16ShareGroupPartitionMetadataKey",
  "versionsvalidVersions": "0+9",
          "aboutflexibleVersions": "none"The error code,
 or 0 if there was no error." },
     "fields": [
    { "name": "ErrorMessageGroupId", "type": "string", "versions": "0+9", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]group id." }
  ]
}

ReadShareGroupStateSummary API

The ReadShareGroupStateSummary API is used by the group coordinator to read a summary of the share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

...

ShareGroupPartitionMetadataValue

Code Block
{
  "apiKeytype": NN"data",
  "typename": "requestShareGroupPartitionMetadataValue",
  "listenersvalidVersions": ["broker0"],
  "nameflexibleVersions": "ReadShareGroupStateSummaryRequest0+",
  "validVersionsfields": [
    { "name": "0Topics",
  "flexibleVersionsversions": "0+", "type": "[]TopicMetadata",
      "about": "The list of topic metadata.", "fields": [
      { "name": "GroupIdTopicId", "typeversions": "string0+", "versionstype": "0+uuid",
        "about": "The grouptopic identifierid." },
      { "name": "TopicsTopicName", "typeversions": "[]ReadStateSummaryData0+", "versionstype": "0+string",
        "about": "The data for the topicstopic name.", "fields": [ },
      { "name": "TopicIdNumPartitions", "typeversions": "uuid0+", "versionstype": "0+int32",
        "about": "The number of partitions of the topic identifier." },
      { "name": "PartitionsPartitionMetadata", "typeversions": "[]PartitionData0+", "versionstype": "0+[]PartitionMetadata",
        "about": "Partitions mapped to  "The data for the partitions.a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [
        { "name": "Partition", "typeversions": "int320+", "versionstype": "0+int32",
          "about": "The partition indexnumber." },
        { "name": "LeaderEpochRacks", "typeversions": "int320+", "versionstype": "0+[]string",
          "about": "The set leaderof epochracks ofthat the share-partitionpartition is mapped to." }
      ]}
    ]}
  ]
}

...

ShareGroupMemberMetadataKey

Code Block
{
  "apiKeytype": NN"data",
  "typename": "responseShareGroupMemberMetadataKey",
  "namevalidVersions": "ReadShareGroupStateSummaryResponse10",
  "validVersionsflexibleVersions": "0none",
  "flexibleVersionsfields": [
    { "name": "GroupId", "type": "string", "versions": "0+10",
  // - NOT_COORDINATOR  "about": "The group id." },
    { "name": "MemberId", "type": "string", "versions": "10",
      "about": "The member id." }
  ]
}

ShareGroupMemberMetadataValue

Code Block
{
  "type": "data",
  "name": "ShareGroupMemberMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",(version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ResultsRackId", "typeversions": "[]ReadStateSummaryResult0+", "versionsnullableVersions": "0+", "type": "string",
      "about": "The read results", "fields": [(optional) rack id." },
      { "name": "TopicIdClientId", "typeversions": "uuid0+", "versionstype": "0+string",
        "about": "The topicclient identifierid." },
      { "name": "PartitionsClientHost", "typeversions": "[]PartitionResult0+", "versionstype": "0+string",
        "about" : "The results for the partitionsclient host.", "fields": [
     },
    { "name": "PartitionSubscribedTopicNames", "typeversions": "int320+", "versionstype": "0+[]string",
      "about": "The list of subscribed topic "aboutnames." }
  ]
}

ShareGroupTargetAssignmentMetadataKey

Code Block
{
  "type": "data",
  "name": "ShareGroupTargetAssignmentMetadataKey"The,
 partition index."validVersions": }"12",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ErrorCodeGroupId", "type": "int16string", "versions": "0+12",
          "about": "The error code, or 0 if group id." }
  ]
}

ShareGroupTargerAssignmentMetadataValue

Code Block
{
  "type": "data"there was no error." },
  "name": "ShareGroupTargetAssignmentMetadataValue",
     {"validVersions": "0",
  "nameflexibleVersions": "ErrorMessage0+",
  "typefields": "string",[
    { "versionsname": "0+AssignmentEpoch", "nullableVersionsversions": "0+", "defaulttype": "nullint32",
          "about": "The error message, or null if there was no errorassignment epoch." }
  ]
}

ShareGroupTargetAssignmentMemberKey

Code Block
{
  "type      { "name": "StateEpochdata",
  "typename": "int32ShareGroupTargetAssignmentMemberKey",
  "versionsvalidVersions": "0+13",
          "aboutflexibleVersions": "The state epoch of the share-partition." }none",
    "fields": [
    { "name": "StartOffsetGroupId", "type": "int64string", "versions": "0+13",
          "about": "The share-partition start offsetgroup id." },
    {  ]}
    ]}
  ]
}

Records

This section describes the new record types.

Group metadata

In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets  topic.

For each share group, a single ShareGroupMetadata record is written. When the group is deleted, a tombstone record is written.

For each member, there is a ShareGroupMemberMetadata record which enables group membership to continue seamlessly across a group coordinator change. When the member leaves, a tombstone record is written.

There is also a ShareGroupPartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.

...

"name": "MemberId", "type": "string", "versions": "13",
      "about": "The member id." }
  ]
}

ShareGroupTargetAssignmentMemberValue

Code Block
{
  "type": "data",
  "name": "ShareGroupMetadataKeyShareGroupTargetAssignmentMemberValue",
  "validVersions": "110",
  "flexibleVersions": "none0+",
  "fields": [
    { "name": "GroupIdTopicPartitions", "typeversions": "string0+", "versionstype": "11[]TopicPartition",
      "about": "The groupassigned id." }
  ]
}

ShareGroupMetadataValue

partitions.", "fields": [
      { "name": "TopicId", "versions": "0+", "type": "uuid" },
      { "name": "Partitions", "versions": "0+", "type": "[]int32" }
    ]}
  ]
}

ShareGroupCurrentMemberAssignmentKey

Code Block
{
  "type": "data",
  "name": "ShareGroupCurrentMemberAssignmentKey",
  "validVersions": "14
Code Block
{
  "type": "data",
  "nameflexibleVersions": "ShareGroupMetadataValuenone",
  "validVersionsfields": [
    { "name": "0GroupId",
  "type": "string", "flexibleVersionsversions": "0+14",
      "fieldsabout": [ "The group id." },
    { "name": "EpochMemberId", "type": "int32string", "versions": "0+14",
      "about": "The groupmember epochid." }
  ]
}

...

ShareGroupCurrentMemberAssignmentValue

Code Block
{
  "type": "data",
  "name": "ShareGroupMemberMetadataKeyShareGroupCurrentMemberAssignmentValue",
  "validVersions": "100",
  "flexibleVersions": "none0+",
  "fields": [
    { "name": "GroupIdMemberEpoch", "typeversions": "string0+", "versionstype": "10int32",
      "about": "The group id current member epoch that is expected from the member in the heartbeat request." },
    { "name": "MemberIdPreviousMemberEpoch", "typeversions": "string0+", "versionstype": "10int32",
      "about": "The member idIf the last epoch bump is lost before reaching the member, the member will retry with the previous epoch." },
    ]
}

ShareGroupMemberMetadataValue

Code Block
{
  "typename": "dataState",
  "nameversions": "ShareGroupMemberMetadataValue0+",
  "validVersionstype": "0int8",
      "flexibleVersionsabout": "0+",
  "fields": [The member state. See MemberState for the possible values." },
    { "name": "RackIdAssignedPartitions", "versions": "0+", "nullableVersions": "0+", "type": "string[]TopicPartitions",
      "about": "The partitions assigned to (optionalor owned by) rackthis idmember." }
  ],
  "commonStructs": [
    { "name": "ClientIdTopicPartitions", "versions": "0+", "typefields": "string",[
      { "aboutname": "The client id." },
    { "nameTopicId", "type": "ClientHostuuid", "versions": "0+",
 "type": "string",
      "about": "The clienttopic hostId." },
      { "name": "SubscribedTopicNamesPartitions", "versionstype": "0+[]int32", "typeversions": "[]string0+",
        "about": "The partition list of subscribed topic names." Ids." }
    ]}
  ]
}

...

ShareGroupStatePartitionMetadataKey

Code Block
{
  "type": "data",
  "name": "ShareGroupPartitionMetadataKeyShareGroupStatePartitionMetadataKey",
  "validVersions": "915",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "915",
      "about": "The group id." }
  ]
}

...

ShareGroupStatePartitionMetadataValue

Code Block
{
  "type": "data",
  "name": "ShareGroupPartitionMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "InitializedTopics", "versions": "0+", "type": "[]TopicPartitionsInfo",
      "about": "The topics with initialized share-group state." },
    { "name": "DeletingTopics", "versions": "0+", "type": "[]TopicInfo",
      "about": "The topics whose share-group state is being deleted." }
  ],
  "commonStructs": [
    { "name": "TopicPartitionsInfo", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "TopicName", "type": "string", "versions": "0+",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
    ]},
    { "name": "TopicInfo", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "TopicName", "type": "string", "versions": "0+",
        "about": "The topic name." }
    ]}
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareSnapshotKey",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0",
      "about": "The group id." },
    { "name": "TopicId", "type": "uuid", "versions": "0",
      "about": "The topic id." },
    { "name": "Partition", "type": "int32", "versions": "0",
      "about": "The partition index." }
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareUpdateKey",
  "validVersions": "1",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "01",
      "about": "The group id." },
    { "name": "TopicId", "type": "uuid", "versions": "01",
      "about": "The topic id." },
    { "name": "Partition", "type": "int32", "versions": "01",
      "about": "The partition index." }
  ]
}

...

Metric Name

Type

Group

Tags

Description

JMX Bean

group-count

Gauge

group-coordinator-metrics

protocol: share

The total number of share groups managed by group coordinator.

kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share 

rebalance (rebalance-rate and rebalance-count)

Meter

group-coordinator-metrics

protocol: share

The total number of share group rebalances count and rate.

kafka.server:type=group-coordinator-metrics,name=rebalance-rate,protocol=share 

kafka.server:type=group-coordinator-metrics,name=rebalance-count,protocol=share 

num-partitions

Gauge

group-coordinator-metrics

protocol: share

The number of share partitions managed by group coordinator. 

kafka.server:type=group-coordinator-metrics,name=num-partitions,protocol=share 

group-countGaugegroup-coordinator-metrics

protocol: share

state: {emptyEmpty|stableStable|deadDead} 

The number of share groups in respective state.kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={emptyEmpty|stableStable|deadDead} 

share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count)

Meter

share-group-metrics


The total number of offsets acknowledged for share groupsacknowledgement requests.

kafka.server:type=share-group-coordinator-metrics,name=share-acknowledgement-rate,protocol=share

kafka.server:type=share-group-coordinator-metrics,name=share-acknowledgement-count,protocol=share 

record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count)

Meter

share-group-metrics

ack-type:{acceptAccept,releaseRelease,rejectReject} 


The number of records acknowledged per acknowledgement type.

kafka.server:type=share-group-metrics,name=record-acknowledgement-rate,protocol=share,ack-type={acceptAccept,releaseRelease,rejectReject} 

kafka.server:type=share-group-metrics,name=record-acknowledgement-count,protocol=share,ack-type={acceptAccept,releaseRelease,rejectReject} 

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-group-metrics


The time taken to load the share partitions.

kafka.server:type=share-group-metrics,name=partition-load-time-avg,protocol=share 

kafka.server:type=share-group-metrics,name=partition-load-time-max,protocol=share  

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-coordinator-metrics


The time taken in milliseconds to load the share-group state from the share-group state partitions.

kafka.server:type=share-coordinator-metrics,name=partition-load-time-avg 

kafka.server:type=share-coordinator-metrcs,name=partition-load-time-max 

thread-idle-ratio (thread-idle-ratio-min and thread-idle-ratio-avg)

Meter

share-coordinator-metrics


The fraction of time the share coordinator thread is idle.

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-min 

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-avg 

write (write-rate and write-total)

Meter

share-coordinator-metrics


The number of share-group state write calls per second.

kafka.server:type=share-coordinator-metrics,name=write-rate 

kafka.server:type=share-coordinator-metrics,name=write-total 

write-latency (write-latency-avg and write-latency-totalmax)

Meter

share-coordinator-metrics


The time taken for a share-group state write call, including the time to write to the share-group state topic.

kafka.server:type=share-coordinator-metrics,name=write-latency-avg 

kafka.server:type=share-coordinator-metrics,name=write-latency-max 

num-partitions

Gauge

share-coordinator-metrics


The number of partitions in the share-state topic.

kafka.server:type=share-coordinator-metrics,name=num-partitions 

...

The following new client metrics should be added:. Following the terminology in KIP-714, these are standard metrics, as opposed to required metrics.

Metric Name

Type

Group

Tags

Description

JMX Bean

Telemetry metric name (KIP-714)

last-poll-seconds-ago

Gauge

consumer-share-metrics

client-id 

The number of seconds since the last poll() invocation.

kafka.consumer:type=consumer-share-metrics,name=last-poll-seconds-ago,client-id=([-.\w]+) 

org.apache.kafka.consumer.share.last.poll.seconds.ago 

time-between-poll-avg

Meter

consumer-share-metrics

client-id 

The average delay between invocations of poll() in milliseconds.

kafka.consumer:type=consumer-share-metrics,name=time-between-poll-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.time.between.poll.avg 

time-between-poll-max

Meter

consumer-share-metrics

client-id 

The maximum delay between invocations of poll() in milliseconds.

kafka.consumer:type=consumer-share-metrics,name=last-poll-seconds-ago,client-id=([-.\w]+) 

o.a.k.consumer.share.time.between.poll.max 

poll-idle-ratio-avg

Meter

consumer-share-metrics

client-id 

The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records.

kafka.consumer:type=consumer-share-metrics,name=poll-idle-ratio-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.poll.idle.ratio.avg 

heartbeat-response-time-max

Meter

consumer-share-coordinator-metrics

client-id 

The maximum time taken to receive a response to a heartbeat request in milliseconds.

kafka.consumer:type=consumer-share-coordinator-metrics,name=heartbeat-response-time-max,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.heartbeat.response.time.max 

heartbeat-rate

Meter

consumer-share-coordinator-metrics

client-id 

The number of heartbeats per second.

kafka.consumer:type=consumer-share-coordinator-metrics,name=heartbeat-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.heartbeat.rate 

heartbeat-total

Meter

consumer-share-coordinator-metrics

client-id 

The total number of heartbeats.

kafka.consumer:type=consumer-share-coordinator-metrics,name=heartbeat-total,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.heartbeat.total 

last-heartbeat-seconds-ago

Gauge

consumer-share-coordinator-metrics

client-id 

The number of seconds since the last coordinator heartbeat was sent.

kafka.consumer:type=consumer-share-coordinator-metrics,name=last-heartbeat-seconds-ago,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.last.heartbeat.seconds.ago 

rebalance-total

Meter

consumer-share-coordinator-metrics

client-id 

The total number of rebalance events.

kafka.consumer:type=consumer-share-coordinator-metrics,name=rebalance-total,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.rebalance.total 

rebalance-rate-per-hour

Meter

consumer-share-coordinator-metrics

client-id 

The number of rebalance events per hour.

kafka.consumer:type=consumer-share-coordinator-metrics,name=rebalance-rate-per-hour,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.rebalance.rate.per.hour 

fetch-size-avg

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of bytes fetched per request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-size-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.size.avg 

fetch-size-max

Meter

consumer-share-fetch-manager-metrics

client-id 

The maximum number of bytes fetched per request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-size-max,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.size.max 

bytes-fetched-rate

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of bytes fetched per second.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=bytes-fetched-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.bytes.fetched.rate 

bytes-fetched-total

Meter

consumer-share-fetch-manager-metrics

client-id 

The total number of bytes fetched.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=bytes-fetched-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.bytes.fetched.total 

records-per-request-avg

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of records in each request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=records-per-request-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.records.per.request.avg 

records-per-request-max

Meter

consumer-share-fetch-manager-metrics

client-id 

The maximum number of records in a request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=records-per-request-max,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.records.per.request.max 

records-fetched-rate

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of records fetched per second.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=records-fetched-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.records.fetched.rate 

records-fetched-total

Meter

consumer-share-fetch-manager-metrics

client-id 

The total number of records fetched.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=records-fetched-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.records.fetched.total 

acknowledgements-send-rate

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of record acknowledgements sent per second.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=acknowledgements-send-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.acknowledgements.send.rate 

acknowledgements-send-total

Meter

consumer-share-fetch-manager-metrics

client-id 

The total number of record acknowledgements sent.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=acknowledgements-send-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.acknowledgements.send.total 

acknowledgements-error-rate

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of record acknowledgements that resulted in errors per second.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=acknowledgements-error-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.acknowledgements.error.rate 

acknowledgements-error-total

Meter

consumer-share-fetch-manager-metrics

client-id 

The total number of record acknowledgements that resulted in errors.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=acknowledgements-error-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.acknowledgements.error.total 

fetch-latency-avg

Meter

consumer-share-fetch-manager-metrics

client-id 

The average time taken for a fetch request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-latency-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.latency.avg 

fetch-latency-max

Meter

consumer-share-fetch-manager-metrics

client-id 

The maximum time taken for any fetch request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-latency-max,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.latency.max 

fetch-rate

Meter

consumer-share-fetch-manager-metrics

client-id 

The number of fetch requests per second.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.rate 

fetch-total

Meter

consumer-share-fetch-manager-metrics

client-id 

The total number of fetch requests.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.total 

fetch-throttle-time-avg

Meter

consumer-share-fetch-manager-metrics

client-id 

The average throttle time in milliseconds.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-throttle-time-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.throttle.time.avg 

fetch-throttle-time-max

Meter

consumer-share-fetch-manager-metrics

client-id 

The maximum throttle time in milliseconds.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-throttle-time-max,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.throttle.time.max 

...