Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added share.fetch.purgatory.purge.interval.requests

...

Whenever the group epoch is larger than the target assignment epoch, the group coordinator triggers the computation of a new target assignment based on the latest group metadata using a server-side assignor. For a share group, the group coordinator does not persist the assignment. The assignment epoch becomes the group epoch of the group metadata used to compute the assignment.

...

For a share group, the group coordinator persists three seven kinds of records:

  • ShareGroupMetadata - this is written to reserve the group's ID as a share group in the namespace of groups.
  • ShareGroupMemberMetadata - this is written to allow group membership to continue seamlessly across a group coordinator change.
  • ShareGroupPartitionMetadata - this is written to persist the metadata about the partitions the group is assigning to the members.
  • ShareGroupTargetAssignmentMetadata - this is written to persist the assignment epoch.
  • ShareGroupTargetAssignmentMember - this is written to persist the target assignment for a member.
  • ShareGroupCurrentMemberAssignment - this is written to persist the current assignment for a member, and also to maintain the sequence of member epochs.
  • ShareGroupStatePartitionMetadata - this is written whenever the set of topic-partitions being consumed in the share group changes. Its purpose is to keep track of which topic-partitions will have share-group persistent state.

When the group coordinator fails over, the newly elected coordinator replays the state from the __consumer_offsets  partition. This means a share group will remain in existence across the fail-over, along with the list of members . However, the assignments are not persisted and will be recalculated by the new group coordinatorand their assignments. It will bump the group epoch as a result.

...

The group coordinator is responsible for asking the share coordinator to initialize and delete the durable share-partition state. The group coordinator uses the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to keep track of which share-partitions have been initialized.

...

  • When a topic is added to the set of subscribed topics for a share group and is not yet in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record
  • When partitions are added to a topic which is already in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record
  • When KafkaAdmin.alterShareGroupOffsets  is used to reset the SPSO for a share-partition

The state is deleted in the following cases:

  • When a topic in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record is deleted
  • When KafkaAdmin.deleteShareGroupOffsets  is used to delete state for a share-partition, most likely as a result of an administrator cleaning up a topic which is no longer in use by this share group
  • When the share group is deleted

The group coordinator periodically reconciles its "hard" state in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata with the "soft" state in the cluster metadata. This is how it observes relevant changes to topics, such as topic deletion. The ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata contains the set of topic-partitions which are known to be initialized.

...

  • Sends the InitializeShareGroupState  RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic added.

...

  • Sends the InitializeShareGroupState  RPC to the share coordinators for all of the new partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the array of partitions for the topic increased.

...

  • If the topic still exists, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic added to the DeletingTopics  array. This enables group coordinator failover to continue the deletion. If the topic does not exist, the requirement to continue deletion can be inferred by the non-existence of the topic.
  • Sends the DeleteShareGroupState  RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic removed.

...

OperationInvolvesNotes
Create share groupGroup coordinator

This occurs as a side-effect of the initial ShareGroupHeartbeat request.

a) The group coordinator serves the ShareGroupHeartbeat  RPC. It writes a ShareGroupMetadata record and a ShareGroupPartitionMetadata record for the share group, a ShareGroupMemberMetadata record for the member onto the __consumer_offsets  topic. The group has now been created but the share-partitions are still being initialized. The group coordinator responds to the ShareGroupHeartbeat  RPC, but the list of assignments is empty.

b) For each share-partition being initialized, the group coordinator sends an InitializeShareGroupState  to the share coordinator. The SPSO is not known yet and is initialized to -1. The group epoch is used as the state epoch.

c) The share coordinator serves the InitializeShareGroupState  RPC. It writes a ShareSnapshot record to the __share_group_state  topic. When the record is replicated, the share coordinator responds to the RPC.

d) Back in the group coordinator, it writes a ShareGroupPartitionMetadata ShareGroupPartitionStateMetadata record on the __consumer_offsets  topic for the share-partitions which are now initialized. The share-partitions are now able to be included in an assignment in the share group.

Assign a share-partitionGroup coordinator and optionally share coordinatorWhen a topic-partition is eligible to be assigned to a member of a share group for the first time, the group coordinator sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record to the __share_group_state  topic and responds to the group coordinator. The group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record, and the share-partition is now able to be included in an assignment in the share group.
List share groupsGroup coordinator
Describe share groupGroup coordinator
List share group offsetsGroup coordinator and share coordinatorThe admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupStateSummary  request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required.
Alter share group offsetsGroup coordinator and share coordinatorOnly empty share groups support this operation. The group coordinator bumps the group epoch, writes a ShareGroupMetadata, and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record with the new state epoch to the __share_group_state  topic. If the partition was not previously initialized, the group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record.
Delete share group offsetsGroup coordinator and share coordinatorThis is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic adding the topic to the DeletingTopics  array and removing it from the InitializedTopics  array. This enables an interrupted deletion to be completed. The group coordinator sends a DeleteShareGroupState  request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic to complete the deletion of the topic from the share group.
Delete share groupGroup coordinator and share coordinator

Only empty share groups can be deleted. If the share group has any topics with initialized state, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic moving all initialized topics into the DeletingTopics  array, and then it sends a DeleteShareGroupState  request to each of the share coordinators for this share group's partitions, which write tombstones to logically delete the state from the __share_group_state  topic. Then, the group coordinator writes a tombstone ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata and finally a tombstone ShareGroupMetadata record to the __consumer_offsets  topic.

...

  • InvalidRecordStateException  - The record state is invalid. The acknowledgement of delivery could not be completed.
  • ShareSessionNotFoundException  - The share session is not found.
  • InvalidShareSessionEpochException  - The share session epoch is invalid.
  • FencedStateEpochException  - The share coordinator rejected the request because the share-group state epoch did not match.

They are all subclasses of RetriableException .

...

Option

Description

--all-topics

Consider all topics assigned to a group in the `reset-offsets` process.

--bootstrap-server <String: server to connect to>

REQUIRED: The server(s) to connect to.

--command-config <String: command config property file>

Property file containing configs to be passed to Admin Client.

--delete

Pass in groups to delete topic partition offsets over the entire Delete share group. For instance --group g1 --group g2

--delete-offsets

Delete offsets of share group. Supports one share group at the time, and multiple topics.

--describe

Describe share group, members and list offset lag (number of records not yet processed) related to given groupoffset information.

--dry-run

Only show results without executing changes on share groups. Supported operations: reset-offsets.

--execute

Execute operation. Supported operations: reset-offsets.

--group <String: share group>

The share group we wish to act on.

--help

Print usage information.

--list

List all share groups.

--members

Describe members of the group. This option may be used with the '--describe' option only.

--offsets

Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only.

--reset-offsets

Reset offsets of share group. Supports one share group at a time, and instances must be inactive. If neither Has 2 execution options, '--dry-run' nor (the default) and '–execute' is specified,'. Has 3 reset options: '--to-earliest', '--to-latest' and '--to-datetime'. 

--state [String]

When specified with '--describe', includes the state of the group. When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. The valid values are 'Empty', 'Stable' and 'Dead'.

--timeout <Long: timeout (ms)>

The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000). (default: 5000)   

--to-datetime <String: datetime>

Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'.

--to-earliest

Reset offsets to earliest offset.

--to-latest

Reset offsets to latest offset.

--topic <String: topic>

The topic whose share group information should be deleted or included in the reset offset process. In `reset-offsets` caseWhen resetting offsets, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partitions to be included.

--version

Display Kafka version.

...

ConfigurationDescriptionValues
group.coordinator.rebalance.protocols The list of enabled rebalance protocols. (Existing configuration)

"share"  is included in the list of protocols to enable share groups.

This will be added to the default value of this configuration property once this feature is complete.

group.share.delivery.count.limitThe maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
group.share.record.lock.duration.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.min.record.lock.duration.ms Share-group record acquisition lock minimum duration in milliseconds.Default 15000 (15 seconds), minimum 1000 (1 second), maximum 30000 (30 seconds)
group.share.max.record.lock.duration.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 30000 (30 seconds), maximum 3600000 (1 hour)
group.share.partition.max.record.locksShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000
group.share.session.timeout.ms 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds)
group.share.min.session.timeout.ms 

The minimum session timeout.

Default 45000 (45 seconds)
group.share.max.session.timeout.ms 

The maximum session timeout.

Default 60000 (60 seconds)
group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

Default 5000 (5 seconds)
group.share.min.heartbeat.interval.ms 

The minimum heartbeat interval.

Default 5000 (5 seconds)
group.share.max.heartbeat.interval.ms 

The maximum heartbeat interval.

Default 15000 (15 seconds)
group.share.max.groups 

The maximum number of share groups.

Default 10, minimum 1, maximum 100
group.share.max.size 

The maximum number of consumers that a single share group can accommodate.

Default 200, minimum 10, maximum 1000
group.share.assignors 

The server-side assignors as a list of full class names. The list must contain only a single entry which is used by all groups. In the future, it is envisaged that a group configuration will be provided to allow each group to choose one of the list of assignors.

A list of class names, currently limited to a single entry. Default "org.apache.kafka.coordinator.group.share.SimpleAssignor"
share.coordinator.state.topic.num.partitions 

The number of partitions for the share-group state topic (should not change after deployment).

Default 50
share.coordinator.state.topic.replication.factor 

The replication factor for the share-group state topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

Default 3 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
share.coordinator.state.topic.segment.bytes 

The log segment size for the share-group state topic.

Default 104857600
share.coordinator.state.topic.min.isr 

Overridden min.insync.replicas for the share-group state topic.

Default 2 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
share.coordinator.state.topic.compression.codec 

Compression codec for the share-group state topic.

Default 0 (NONE)
share.coordinator.append.linger.ms 

The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk.

Default 10, minimum 0
share.coordinator.load.buffer.size 

Batch size for reading from the share-group state topic when loading state information into the cache (soft-limit, overridden if records are too large).

Default 5 * 1024 * 1024 (5242880), minimum 1
share.coordinator.snapshot.update.records.per.snapshot 

The number of update records the share coordinator writes between snapshot records.

Default 500, minimum 0
share.coordinator.threads 

The number of threads used by the share coordinator.

Default 1, minimum 1
share.coordinator.write.timeout.ms 

The duration in milliseconds that the share coordinator will wait for all replicas of the share-group state topic to receive a write.

Default 5000, minimum 1
share.fetch.purgatory.purge.interval.requests 

The purge interval (in number of requests) of the share fetch request purgatory

Default 1000

Group configuration

The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.

...

  • FindCoordinator  - for finding coordinators, to support share coordinators
  • ListGroups  - for listing groups, to support listing share groups

Access controlAccess control

This table gives the ACLs required for the new APIs.

...

This KIP adds the following error codes the Kafka protocol.

  • INVALID_RECORD_STATE  (121) - The record state is invalid. The acknowledgement of delivery could not be completed.
  • SHARE_SESSION_NOT_FOUND  (122) - The share session is not found.
  • INVALID_SHARE_SESSION_EPOCH  (123) - The share session epoch is invalid.
  • FENCED_STATE_EPOCH  (124) - The share coordinator rejected the request because share-group state epoch did not match.

...

The KIP introduces version 56.

Request schema

Version 5 6 adds the new key type of FindCoordinatorRequest.CoordinatorType.SHARE with value 2 with the key of "group:topicId:partition".

Code Block
{
  "apiKey": 10,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "FindCoordinatorRequest",
  // Version 1 adds KeyType.
  //
  // Version 2 is the same as version 1.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds support for batching via CoordinatorKeys (KIP-699)
  //
  // Version 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
  //
  // Version 6 adds support for share groups (KIP-932).
  "validVersions": "0-56",
  "deprecatedVersions": "0",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "Key", "type": "string", "versions": "0-3",
      "about": "The coordinator key." },
    { "name": "KeyType", "type": "int8", "versions": "1+", "default": "0", "ignorable": false,
      "about": "The coordinator key type. (Group, transaction, share, etc.)" },
    { "name": "CoordinatorKeys", "type": "[]string", "versions": "4+",
      "about": "The coordinator keys." }
  ]
}

Response schema

Version 5 6 is the same as version 45.

ListGroups API

This KIP introduces version 6 

Request schema

ShareGroupHeartbeat API

The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.

Request schema

The member must set all the top-level fields when it joins for the first time or when an error occurs. Otherwise, it is expected only to fill in the fields which have changed since the last heartbeatVersion 6 adds support for share groups.

Code Block
{
  "apiKey": 1676,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListGroupsRequestShareGroupHeartbeatRequest",
  // Version 1 and 2 are the same as version 0.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the StatesFilter field (KIP-518).
  //
  // Version 5 adds the TypesFilter field (KIP-848).
  //
  // Version 6 adds support for share groups.
  "validVersions": "0-6",
  "flexibleVersions": "3+",
  "fields": ["validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member ID generated by the coordinator. The member ID must be kept during the entire lifetime of the member." },
    { "name": "StatesFilterMemberEpoch", "type": "[]stringint32", "versions": "40+",
      "about": "The statescurrent ofmember the groups we wantepoch; 0 to list.join Ifthe empty, all groups are returned with their stategroup; -1 to leave the group." },
    { "name": "TypesFilterRackId", "type": "[]string", "versions": "0+",  "nullableVersions": "50+", "default": "null",
      "about": "Thenull typesif ofnot theprovided groupsor weif wantit to list. If empty, all groups are returned with their typedidn't change since the last heartbeat; the rack ID of consumer otherwise." },
    { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }
  ]
}

Response schema

Version 6 is the same as version 5.

ShareGroupHeartbeat API

The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.

Request schema

The member must set all the top-level fields when it joins for the first time or when an error occurs. Otherwise, it is expected only to fill in the fields which have changed since the last heartbeat.coordinator will only send the Assignment field when it changes.

Code Block
{
  "apiKey": TBD76,
  "type": "request",
  "listeners": ["broker"]response",
  "name": "ShareGroupHeartbeatRequestShareGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },// Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_MEMBER_ID (version 0+)
  // - GROUP_MAX_SIZE_REACHED (version 0+)
  "fields": [
    { "name": "MemberIdThrottleTimeMs", "type": "stringint32", "versions": "0+",
      "about": "The duration memberin IDmilliseconds generatedfor bywhich the coordinator. The member ID must be kept during the entire lifetime of the member request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "MemberEpochErrorCode", "type": "int32int16", "versions": "0+",
      "about": "The top-level currenterror membercode, epoch;or 0 toif jointhere thewas group; -1 to leave the group.no error" },
    { "name": "RackIdErrorMessage", "type": "string", "versions": "0+",  "nullableVersions": "0+", "default": "null",
      "about": "nullThe iftop-level noterror providedmessage, or null if itthere didn'twas change since the last heartbeat; the rack ID of consumer otherwiseno error." },
    { "name": "SubscribedTopicNamesMemberId", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "nullThe member ifID itgenerated didn'tby changethe sincecoordinator. theOnly lastprovided heartbeat;when the subscribed topic names otherwisemember joins with MemberEpoch == 0." },
     ]}
  ]
}

Response schema

The group coordinator will only send the Assignment field when it changes.

Code Block
{
  "apiKey": TBD,
  "type": "response",
 { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "ShareGroupHeartbeatResponseHeartbeatIntervalMs",
  "validVersionstype": "0int32",
  "flexibleVersionsversions": "0+",
    // Supported errors:"about": "The heartbeat interval in milliseconds." },
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_MEMBER_ID (version 0+)
  // - GROUP_MAX_SIZE_REACHED (version 0+)
  { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not provided; the assignment otherwise.", "fields": [
        { "name": "ThrottleTimeMsTopicPartitions", "type": "int32[]TopicPartitions", "versions": "0+",
          "about": "The durationpartitions in milliseconds for whichassigned to the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
member." }
    ]}
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
        { "name": "ErrorCodeTopicId", "type": "int16uuid", "versions": "0+",
          "about": "The top-level error code, or 0 if there was no error" },
  topic ID." },
        { "name": "ErrorMessagePartitions", "type": "string[]int32", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersionspartitions." }
    ]}
  ]
}

ShareGroupDescribe API

The ShareGroupDescribe API is used to describe share groups.

Request schema

Code Block
{
  "apiKey": 77,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareGroupDescribeRequest",
  "validVersions": "0+",
  "defaultflexibleVersions": "null0+",
      "aboutfields": "The member ID generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." },[
    { "name": "MemberEpochGroupIds", "type": "int32[]string", "versions": "0+", "entityType": "groupId",
      "about": "The member epoch. ids of the groups to describe" },
    { "name": "HeartbeatIntervalMsIncludeAuthorizedOperations", "type": "int32bool", "versions": "0+",
      "about": "TheWhether heartbeatto intervalinclude inauthorized millisecondsoperations." },
  ]
}

Response schema

Code Block
{
   { "nameapiKey": "Assignment"77,
  "type": "Assignmentresponse",
  "versionsname": "0+ShareGroupDescribeResponse",
  "nullableVersionsvalidVersions": "0+",
  "defaultflexibleVersions": "null0+",
  // Supported   "about"errors:
 "null if// not provided; the assignment otherwise.", "fields": [- GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - INVALID_GROUP_ID (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  "fields": [
    { "name": "TopicPartitionsThrottleTimeMs", "type": "[]TopicPartitionsint32", "versions": "0+",
          "about": "The duration in partitionsmilliseconds assignedfor towhich the member." }
    ]}
  ],
  "commonStructs": [request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "TopicPartitions[]DescribedGroup", "versions": "0+",
      "about": "Each described group.",
      "fields": [
        { "name": "TopicIdErrorCode", "type": "uuidint16", "versions": "0+",
          "about": "The topic ID describe error, or 0 if there was no error." },
        { "name": "PartitionsErrorMessage", "type": "[]int32string", "versions": "0+",
          "aboutnullableVersions": "The partitions." }
0+", "default": "null",
      ]}
  ]
}

ShareGroupDescribe API

The ShareGroupDescribe API is used to describe share groups.

Request schema

Code Block
{
  "apiKeyabout": NN,
  "type "The top-level error message, or null if there was no error." },
        { "name": "requestGroupId",
  "listenerstype": ["brokerstring"],
  "nameversions": "ShareGroupDescribeRequest0+",
  "validVersionsentityType": "0groupId",
          "flexibleVersionsabout": "0+"The group ID string." },
  "fields": [
      { "name": "GroupIdsGroupState", "type": "[]string", "versions": "0+", "entityType": "groupId",

          "about": "The ids of group state string, or the groups to describeempty string." },
        { "name": "IncludeAuthorizedOperationsGroupEpoch", "type": "boolint32", "versions": "0+",
          "about": "WhetherThe to include authorized operationsgroup epoch." },
    ]
}

Response schema

Code Block
{
  "apiKey": NN,
 { "typename": "responseAssignmentEpoch",
  "nametype": "ShareGroupDescribeResponseint32",
  "validVersionsversions": "0+",
  "flexibleVersions        "about": "0+"The assignment epoch." },
   // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - INVALID_GROUP_ID (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  "fields": [
    { "name": "AssignorName", "type": "string", "versions": "0+",
          "about": "The selected assignor." },
        { "name": "ThrottleTimeMsMembers", "type": "int32[]Member", "versions": "0+",
          "about": "The durationmembers.",
 in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
"fields": [
            { "name": "GroupsMemberId", "type": "[]DescribedGroupstring", "versions": "0+",
              "about": "EachThe describedmember groupID." },
      "fields": [
        { "name": "ErrorCodeRackId", "type": "int16string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The describemember error, or 0 if there was no errorrack ID." },
            { "name": "ErrorMessageMemberEpoch", "type": "stringint32", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
          "about": "The top-levelcurrent error message, or null if there was no errormember epoch." },
            { "name": "GroupIdClientId", "type": "string", "versions": "0+", "entityType": "groupId",

              "about": "The groupclient ID string." },
            { "name": "GroupStateClientHost", "type": "string", "versions": "0+",
              "about": "The groupclient statehost." string}, or
 the empty string." },
        { "name": "GroupEpochSubscribedTopicNames", "type": "int32[]string", "versions": "0+", "entityType": "topicName",
              "about": "The subscribed grouptopic epochnames." },
            { "name": "AssignmentEpochAssignment", "type": "int32Assignment", "versions": "0+",
              "about": "The current assignment epoch." }
          ]},
        { "name": "AssignorNameAuthorizedOperations", "type": "stringint32", "versions": "0+", "default": "-2147483648",
          "about": "The selected assignor32-bit bitfield to represent authorized operations for this group." },
      ]
    }
  ],
 { "namecommonStructs": "Members", [
    { "typename": "[]MemberTopicPartitions", "versions": "0+",
 "fields": [
       { "aboutname": "The members.",
          "fields": [
            { "name": "MemberId""TopicId", "type": "stringuuid", "versions": "0+",
              "about": "The membertopic ID." },
            { "name": "RackIdTopicName", "type": "string", "versions": "0+", "nullableVersionsentityType": "0+topicName", "default": "null",
              "about": "The membertopic rack IDname." },
            { "name": "MemberEpochPartitions", "type": "[]int32", "versions": "0+",
              "about": "The current member epochpartitions." },
        ]},
    { "name": "ClientIdAssignment", "type": "string", "versions": "0+",
              "about"fields": "The client ID." },
  [
          { "name": "ClientHostTopicPartitions", "type": "string[]TopicPartitions", "versions": "0+",
              "about": "The client host assigned topic-partitions to the member." },
            { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName",
         ]}
  ]
}

ShareFetch API

The ShareFetch API is used by share group consumers to fetch acquired records from share-partition leaders. It is also possible to piggyback acknowledgements in this request to reduce the number of round trips.

The first request from a share consumer to a share-partition leader broker establishes a share session by setting MemberId to the member ID it received from the group coordinator and ShareSessionEpoch to 0. Then each subsequent ShareFetch or ShareAcknowledge  request specifies the MemberId  and increments the ShareSessionEpoch  by one. When the share consumer wishes to close the share session, it sets MemberId  to the member ID and ShareSessionEpoch  to -1.

When piggybacking acknowledgements in this request, there are a few special cases.

  • If acknowledgements are being made for a partition and no records should be fetched, PartitionMaxBytes  should be set to zero.
  • If acknowledgements are being made for a partition which is being removed from the share session, the partition is included in the Topics array with PartitionMaxBytes  set to zero AND the partition is included in ForgottenTopicsData .
  • If acknowledgements are being made for a partition in the final request in a share session, the partition is included in the Topics  array and ShareSessionEpoch  is set to -1. No data will be fetched and it is not necessary to include the partition in ForgottenTopicsData .
  • If there's an error which affects all piggybacked acknowledgements but which does not prevent data from being fetched, the AcknowledgeErrorCode  in the response will be set to the same value for all partitions which had piggybacked acknowledgements.

Request schema

For the AcknowledgementBatches of each topic-partition, the FirstOffsets  must be ascending order and the ranges must be non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

Code Block
{
  "apiKey": 78,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The member ID." },
    { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
      "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The subscribedtopics topicto namesfetch." },
            , "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "AssignmentPartitions", "type": "Assignment[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The currentpartition assignmentindex." },
        { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." ]},
        { "name": "AuthorizedOperationsAcknowledgementBatches", "type": "int32[]AcknowledgementBatch", "versions": "0+", "default": "-2147483648",
          "about": "32-bitRecord bitfieldbatches to represent authorized operations for this groupacknowledge." }, "fields": [
      ]
    }
  ],
 { "commonStructsname": [
    { "name"FirstOffset", "type": "TopicPartitionsint64", "versions": "0+",
 "fields           "about": [
 "First offset of batch of records to acknowledge."},
          { "name": "TopicIdLastOffset", "type": "uuidint64", "versions": "0+",
            "about": "The topic ID." Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "TopicNameAcknowledgeTypes", "type": "string[]int8", "versions": "0+", "entityType": "topicName",

            "about": "The topic name." },
  Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}
    ]},
    { "name": "PartitionsForgottenTopicsData", "type": "[]int32ForgottenTopic", "versions": "0+",
 "ignorable": false,
      "about": "The partitions." }
to    ]},
    { "name": "Assignment", "versions": "0+remove from this share session.", "fields": [
      { "name": "TopicPartitionsTopicId", "type": "[]TopicPartitionsuuid", "versions": "0+",
       "ignorable": true, "about": "The assigned topic-partitions to the member." }
    ]}
  ]
}

ShareFetch API

The ShareFetch API is used by share group consumers to fetch acquired records from share-partition leaders. It is also possible to piggyback acknowledgements in this request to reduce the number of round trips.

The first request from a share consumer to a share-partition leader broker establishes a share session by setting MemberId to the member ID it received from the group coordinator and ShareSessionEpoch to 0. Then each subsequent ShareFetch or ShareAcknowledge  request specifies the MemberId  and increments the ShareSessionEpoch  by one. When the share consumer wishes to close the share session, it sets MemberId  to the member ID and ShareSessionEpoch  to -1.

When piggybacking acknowledgements in this request, there are a few special cases.

  • If acknowledgements are being made for a partition and no records should be fetched, PartitionMaxBytes  should be set to zero.
  • If acknowledgements are being made for a partition which is being removed from the share session, the partition is included in the Topics array with PartitionMaxBytes  set to zero AND the partition is included in ForgottenTopicsData .
  • If acknowledgements are being made for a partition in the final request in a share session, the partition is included in the Topics  array and ShareSessionEpoch  is set to -1. No data will be fetched and it is not necessary to include the partition in ForgottenTopicsData .
  • If there's an error which affects all piggybacked acknowledgements but which does not prevent data from being fetched, the AcknowledgeErrorCode  in the response will be set to the same value for all partitions which had piggybacked acknowledgements.

Request schema

For the AcknowledgementBatches of each topic-partition, the FirstOffsets  must be ascending order and the ranges must be non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

 "The unique topic ID."},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions indexes to forget." }
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": 78,
  "type": "response",
  "name": "ShareFetchResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors for ErrorCode and AcknowledgeErrorCode:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+) - only for AcknowledgeErrorCode
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - CORRUPT_MESSAGE (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupIdThrottleTimeMs", "type": "stringint32", "versions": "0+", "nullableVersionsignorable": "0+", "default": "null", "entityType": "groupId",
      "about": "The group identifiertrue,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "MemberIdErrorCode", "type": "stringint16", "versions": "0+", "nullableVersionsignorable": "0+"true,
      "about": "The member ID top-level response error code." },
    { "name": "ShareSessionEpochErrorMessage", "type": "int32string", "versions": "0+",
     "nullableVersions": "0+", "aboutdefault": "null"The,
 current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests"about": "The top-level error message, or null if there was no error." },
     { "name": "MaxWaitMsResponses", "type": "int32[]ShareFetchableTopicResponse", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
response topics.", "fields": [
      { "name": "MinBytesTopicId", "type": "int32uuid", "versions": "0+",
     "ignorable": true, "about": "The minimumunique bytes to accumulate in the responsetopic ID." },
      { "name": "MaxBytesPartitions", "type": "int32[]PartitionData", "versions": "0+",
 "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    topic partitions.", "fields": [
        { "name": "TopicsPartitionIndex", "type": "[]FetchTopicint32", "versions": "0+",
          "about": "The topicspartition to fetchindex." },
 "fields": [
      { "name": "TopicIdErrorCode", "type": "uuidint16", "versions": "0+", "ignorable": true,
          "about": "The unique topic ID."fetch error code, or 0 if there was no fetch error." },
        { "name": "PartitionsErrorMessage", "type": "[]FetchPartitionstring", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The partitionsfetch toerror fetch."message, "fields": [
        or null if there was no fetch error." },
        { "name": "PartitionIndexAcknowledgeErrorCode", "type": "int32int16", "versions": "0+",
          "about": "The partition index": "The acknowledge error code, or 0 if there was no acknowledge error." },
        { "name": "PartitionMaxBytesAcknowledgeErrorMessage", "type": "int32string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The maximumacknowledge byteserror tomessage, fetchor fromnull thisif partition. 0 when only acknowledgement withthere was no fetchingacknowledge is required. See KIP-74 for cases where this limit may not be honored.error." }, 
            { "name": "AcknowledgementBatchesCurrentLeader", "type": "[]AcknowledgementBatchLeaderIdAndEpoch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "FirstOffsetLeaderId", "type": "int64int32", "versions": "0+",
            "about": "FirstThe offsetID of batch of records to acknowledge." the current leader or -1 if the leader is unknown." },
          { "name": "LastOffsetLeaderEpoch", "type": "int64int32", "versions": "0+",
            "about": "The latest known leader epoch."Last }
 offset (inclusive) of batch of records to acknowledge."]},
          { "name": "AcknowledgeTypesRecords", "type": "[]int8records", "versions": "0+",
            "about"nullableVersions": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}0+", "about": "The record data."},
    ]},
    { "name": "ForgottenTopicsDataAcquiredRecords", "type": "[]ForgottenTopicAcquiredRecords", "versions": "0+", "ignorableabout": false, "The acquired records.", "fields":  [
          {"name": "FirstOffset", "type":  "int64", "versions": "0+", "about": "The partitionsearliest offset toin removethis frombatch thisof shareacquired sessionrecords."}, "fields": [

          { "name": "TopicIdLastOffset", "type": "uuidint64", "versions": "0+", "ignorable": true, "about": "The unique topic ID last offset of this batch of acquired records."},
          { "name": "PartitionsDeliveryCount", "type": "[]int32int16", "versions": "0+",
        "about": "The partitions indexes to forget." }
 delivery count of this batch of acquired records."}
        ]}
      ]
}

Response schema

Code Block
{
   "apiKey": NN ]},
  "type": "response",
 { "name": "ShareFetchResponseNodeEndpoints",
  "validVersionstype": "0[]NodeEndpoint",
  "flexibleVersionsversions": "0+",
     // Supported errors "about": "Endpoints for all ErrorCodecurrent leaders andenumerated AcknowledgeErrorCode:
in PartitionData //with -error GROUPNOT_LEADER_AUTHORIZATION_FAILED (version 0+)OR_FOLLOWER.", "fields": [
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+) - only for AcknowledgeErrorCode
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - CORRUPT_MESSAGE (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
     { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "ThrottleTimeMsRack", "type": "int32string", "versions": "0+", "ignorable"nullableVersions": "0+", "default": true"null",
        "about": "The durationrack in milliseconds for which of the requestnode, wasor throttlednull dueif toit ahas quotanot violation,been orassigned zeroto ifa the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top-level response error code." },rack." }
    ]}
  ]
}

ShareAcknowledge API

The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.

Request schema

For the AcknowledgementBatches of each topic-partition, the FirstOffsets  must be ascending order and the ranges must be non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

Code Block
{
  "apiKey": 79,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareAcknowledgeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorMessageGroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The top-level error message, or null if there was no errorgroup identifier." },
     { "name": "ResponsesMemberId", "type": "[]ShareFetchableTopicResponsestring", "versions": "0+", "nullableVersions": "0+",
      "about": "The responsemember topicsID.", "fields": [ },
      { "name": "TopicIdShareSessionEpoch", "type": "uuidint32", "versions": "0+", "ignorable": true,
      "about": "The unique topic ID." current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
        { "name": "PartitionsTopics", "type": "[]PartitionDataAcknowledgeTopic", "versions": "0+",
        "about": "The topic partitions topics containing records to acknowledge.", "fields": [
        { "name": "PartitionIndexTopicId", "type": "int32uuid", "versions": "0+",
          "about": "The unique partitiontopic indexID." },
        { "name": "ErrorCodePartitions", "type": "int16[]AcknowledgePartition", "versions": "0+",
          "about": "The fetchpartitions errorcontaining code,records or 0 if there was no fetch error." },to acknowledge.", "fields": [
        { "name": "ErrorMessagePartitionIndex", "type": "stringint32", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The fetch error message, or null if there was no fetch error"about": "The partition index." },
            { "name": "AcknowledgeErrorCodeAcknowledgementBatches", "type": "int16[]AcknowledgementBatch", "versions": "0+",
          "about": "TheRecord acknowledgebatches errorto codeacknowledge.", or 0 if there was no acknowledge error." },
 "fields": [
          { "name": "AcknowledgeErrorMessageFirstOffset", "type": "stringint64", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
          "about": "TheFirst acknowledgeoffset errorof message,batch orof nullrecords if there was no acknowledge error." }, 
        to acknowledge."},
          { "name": "CurrentLeaderLastOffset", "type": "LeaderIdAndEpochint64", "versions": "0+",
  "fields": [          "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "LeaderIdAcknowledgeTypes", "type": "int32[]int8", "versions": "0+",
            "about": "TheArray IDof ofacknowledge thetypes current leader or -1 if the leader is unknown." },
- 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": 79,
  "type": "response",
  "name": "LeaderEpochShareAcknowledgeResponse",
  "typevalidVersions": "int320",
  "versionsflexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  "about": "The latest known leader epoch." }
        ]},
        { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."},
        { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields": // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
     {     {"name": "FirstOffsetThrottleTimeMs", "type":  "int64int32", "versions": "0+", "ignorable": true,
      "about": "The earliest offset in this batch of acquired records."},
          { duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "LastOffsetErrorCode", "type": "int64int16", "versions": "0+", "aboutignorable": "The last offset of this batch of acquired records."},
      true,
      "about": "The top level response error code." },
    { "name": "DeliveryCountErrorMessage", "type": "int16string", "versions": "0+", "aboutnullableVersions": "The delivery count of this batch of acquired records."}0+", "default": "null",
      "about":  ]}
      ]}
    ]"The top-level error message, or null if there was no error." },
     { "name": "NodeEndpointsResponses", "type": "[]NodeEndpointShareAcknowledgeTopicResponse", "versions": "0+",
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [The response topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "NodeIdPartitions", "type": "int32[]PartitionData", "versions": "0+",
        "mapKeyabout": true, "entityType": "brokerId""The topic partitions.", "aboutfields": "The ID of the associated node." },
[
        { "name": "HostPartitionIndex", "type": "stringint32", "versions": "0+",
          "about": "The node'spartition hostnameindex." },
        { "name": "PortErrorCode", "type": "int32int16", "versions": "0+",
          "about": "The node's port error code, or 0 if there was no error." },
        { "name": "RackErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The rack of the nodeerror message, or null if itthere haswas not been assigned to a rack." }
    ]}
  ]
}

ShareAcknowledge API

The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.

Request schema

For the AcknowledgementBatches of each topic-partition, the FirstOffsets  must be ascending order and the ranges must be non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
 no error." },
        { "name": "ShareAcknowledgeRequestCurrentLeader",
  "validVersionstype": "0LeaderIdAndEpoch",
  "flexibleVersionsversions": "0+",
  "fields": [
          { "name": "GroupIdLeaderId", "type": "stringint32", "versions": "0+",
 "nullableVersions           "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "0+LeaderEpoch", "defaulttype": "nullint32", "entityTypeversions": "groupId0+",
            "about": "The latest known groupleader identifierepoch." },

        ]}
      ]}
    ]},
    { "name": "MemberIdNodeEndpoints", "type": "string[]NodeEndpoint", "versions": "0+",
      "nullableVersionsabout": "0+",
      "about": "The member ID." },
Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "ShareSessionEpochNodeId", "type": "int32", "versions": "0+",
        "aboutmapKey": true, "entityType"The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests: "brokerId", "about": "The ID of the associated node." },
        { "name": "TopicsHost", "type": "[]AcknowledgeTopicstring", "versions": "0+",
        "about": "The topics containing records to acknowledgenode's hostname.", "fields": [ },
      { "name": "TopicIdPort", "type": "uuidint32", "versions": "0+",
        "about": "The unique topic IDnode's port." },
      { "name": "PartitionsRack", "type": "[]AcknowledgePartitionstring", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The partitions containing records to acknowledge.", "fields": [
   rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

AlterShareGroupOffsets API

The AlterShareGroupOffsets API is used to alter the share-partition start offsets for the share-partitions in a share group. The group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "AlterShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
     { "name": "PartitionIndexGroupId", "type": "string", "versions": "int320+", "versionsentityType": "0+groupId",
                "about": "The partitiongroup indexidentifier." },
        { "name": "AcknowledgementBatchesTopics", "type": "[]AcknowledgementBatchAlterShareGroupOffsetsRequestTopic", "versions": "0+",
          "about": "RecordThe batchestopics to alter acknowledgeoffsets for.",  "fields": [
          { "name": "FirstOffsetTopicName", "type": "int64string", "versions": "0+",
 "entityType": "topicName", "mapKey": true,
        "about": "FirstThe offset of batch of records to acknowledgetopic name." },
          { "name": "LastOffsetPartitions", "type": "int64[]AlterShareGroupOffsetsRequestPartition", "versions": "0+",
            "about": "LastEach offsetpartition (inclusive)to ofalter batch of records to acknowledgeoffsets for."},
 "fields": [
        { "name": "AcknowledgeTypesPartitionIndex", "type": "[]int8int32", "versions": "0+",
            "about": "ArrayThe ofpartition acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]index." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareAcknowledgeResponseAlterShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - SHARE_SESSION_NOT_FOUNDNOT_COORDINATOR (version 0+)
  // - INVALID_SHARECOORDINATOR_SESSIONNOT_EPOCHAVAILABLE (version 0+)
 
  // - NOTCOORDINATOR_LEADERLOAD_ORIN_FOLLOWERPROGRESS (version 0+)
  // - UNKNOWNGROUP_ID_TOPICNOT_IDFOUND (version 0+)
  // - INVALIDGROUP_RECORDNOT_STATEEMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
   "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCodeResponses", "type": "int16[]AlterShareGroupOffsetsResponseTopic", "versions": "0+",
      "ignorableabout": true, "The results for each topic.", "fields": [
      "about{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "The top level response error code"topicName",
        "about": "The topic name." },
      { "name": "ErrorMessageTopicId", "type": "stringuuid", "versions": "0+", "nullableVersionsignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "0+[]AlterShareGroupOffsetsResponsePartition", "defaultversions": "null0+", "fields": [
        { "aboutname": "PartitionIndex", "The top-level error message, or null if there was no errortype": "int32", "versions": "0+",
          "about": "The partition index." },
         { "name": "ResponsesErrorCode", "type": "[]ShareAcknowledgeTopicResponseint16", "versions": "0+",
          "about": "The response topics.", "fields": [
 error code, or 0 if there was no error." },
        { "name": "TopicIdErrorMessage", "type": "uuidstring", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The unique topic ID."},error message, or null if there was no error." }
      ]}
    ]}
  ]
}

DeleteShareGroupOffsets API

The DeleteShareGroupOffsets API is used to delete the offsets for the share-partitions in a share group. The group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "nametype": "Partitionsrequest",
  "listeners": "type["broker"],
  "name": "[]PartitionDataDeleteShareGroupOffsetsRequest", 
  "versionsvalidVersions": "0+",
        "about  "flexibleVersions": "The topic partitions.0+", 
  "fields": [
        { "name": "PartitionIndexGroupId", "type": "int32string", "versions": "0+",
          "entityType": "groupId",
      "about": "The partitiongroup indexidentifier." },
        { "name": "ErrorCodeTopics", "type": "int16[]DeleteShareGroupOffsetsRequestTopic", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
   topics to delete offsets for.",  "fields": [
      { "name": "ErrorMessageTopicName", "type": "string", "versions": "0+", "nullableVersionsentityType": "0+", "default": "null"topicName",
          "about": "The topic name." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupOffsetsResponse",
  "validVersions error message, or null if there was no error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+",
  "fieldsflexibleVersions": ["0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED  { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
      (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "LeaderEpochThrottleTimeMs", "type": "int32", "versions": "0+",
      "ignorable": true,
      "about": "The duration in latestmilliseconds knownfor leaderwhich epoch." }
        ]}
      ]}
    ]the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "NodeEndpointsResponses", "type": "[]NodeEndpointDeleteShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "EndpointsThe results for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWEReach topic.", "fields": [
      { "name": "NodeIdTopicName", "type": "int32string", "versions": "0+",
 "entityType": "topicName",
      "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated nodetopic name." },
      { "name": "HostTopicId", "type": "stringuuid", "versions": "0+", "ignorable": true,
        "about": "The unique node'stopic hostnameID." },
      { "name": "PortErrorCode", "type": "int32int16", "versions": "0+",
        "about": "The node's port error code, or 0 if there was no error." },
      { "name": "RackErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
        "about": "The rack of the nodeerror message, or null if itthere haswas not been assigned to a rackno error." }
    ]}
   ]
}

...

DescribeShareGroupOffsets API

The AlterShareGroupOffsets DescribeShareGroupOffsets API is used to alter describe the share-partition start offsets for the share-partitions in a share group. The group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "AlterShareGroupOffsetsRequestDescribeShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
    "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]AlterShareGroupOffsetsRequestTopicDescribeShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to alterdescribe offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]AlterShareGroupOffsetsRequestPartition", "versions": "0+",
        "about": "Each partition to alter offsets for.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition indexpartitions." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "AlterShareGroupOffsetsResponseDescribeShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]AlterShareGroupOffsetsResponseTopicDescribeShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versions": "The unique topic ID." },
0+", "fields": [
        { "name": "PartitionsPartitionIndex", "type": "[]AlterShareGroupOffsetsResponsePartitionint32", "versions": "0+",
          "fieldsabout": ["The partition index." },
        { "name": "PartitionIndexStartOffset", "type": "int32int64", "versions": "0+",
          "about": "The share-partition start indexoffset." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

InitializeShareGroupStateAPI

The DeleteShareGroupOffsets InitializeShareGroupState API is used by the group coordinator to delete initialize the offsets for the share-partitions in a share group. The group coordinator serves this APIpartition state. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
    "apiKey": NN,
    "type": "request",
    "listeners": ["broker"],
    "name": "DeleteShareGroupOffsetsRequestInitializeShareGroupStateRequest",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]DeleteShareGroupOffsetsRequestTopicInitializeStateData", "versions": "0+",
      "about": "The topicsdata tofor deletethe offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
 topics.", "fields": [
      { "name": "DeleteShareGroupOffsetsResponseTopicId",
  "validVersionstype": "0uuid",
  "flexibleVersionsversions": "0+",
    // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)"about": "The topic identifier." },
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
 { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ThrottleTimeMsStateEpoch", "type": "int32", "versions": "0+", "ignorable": true,

          "about": "The durationstate in milliseconds for whichepoch of the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    share-partition." },
        { "name": "ResponsesStartOffset", "type": "[]DeleteShareGroupOffsetsResponseTopicint64", "versions": "0+",
          "about": "The results for each topic.", "fields": [ share-partition start offset, or -1 if the start offset is not being initialized." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
 "name "apiKey": NN,
  "type": "TopicNameresponse",
  "typename": "stringInitializeShareGroupStateResponse",
  "versionsvalidVersions": "0+",
  "entityTypeflexibleVersions": "topicName",0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
   "aboutfields": "The topic name." },[
      { "name": "TopicIdResults", "type": "uuid[]InitializeStateResult", "versions": "0+", "ignorable": true,
        "about": "The uniqueinitialization topic ID." }results", "fields": [
      { "name": "ErrorCodeTopicId", "type": "int16uuid", "versions": "0+",
        "about": "The error code, or 0 if there was no error.topic identifier" },
      { "name": "ErrorMessagePartitions", "type": "string[]PartitionResult", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
        "about": "The errorresults message, or null if there was no error." }
    ]}
  ]
}

DescribeShareGroupOffsets API

The DescribeShareGroupOffsets API is used to describe the offsets for the share-partitions in a share group. The group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DescribeShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
for the partitions.", "fields": [
        { "name": "GroupIdPartition", "type": "stringint32", "versions": "0+", "entityType": "groupId",
      
          "about": "The grouppartition identifierindex." },
        { "name": "TopicsErrorCode", "type": "[]DescribeShareGroupOffsetsRequestTopicint16", "versions": "0+",
          "about": "The topics to describe offsets for.",  "fields": [
 error code, or 0 if there was no error." },
        { "name": "TopicNameErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "entityTypedefault": "topicNamenull",
          "about": "The topic nameerror message, or null if there was no error." },
      ]}
    ]}
  ]
}

ReadShareGroupState API

The ReadShareGroupState API is used by share-partition leaders to read share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "nameapiKey": "Partitions"NN,
  "type": "[]int32request",
  "versionslisteners": ["0+broker"],
  "name": "ReadShareGroupStateRequest",
  "validVersions": "0",
  "aboutflexibleVersions": "The partitions." }0+",
  "fields": [
   ]}
 { "name":  ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "DescribeShareGroupOffsetsResponseTopics",
  "validVersionstype": "0[]ReadStateData",
  "flexibleVersionsversions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
 "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "ThrottleTimeMsPartitions", "type": "int32[]PartitionData", "versions": "0+",
 "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ResponsesLeaderEpoch", "type": "[]DescribeShareGroupOffsetsResponseTopicint32", "versions": "0+",
          "about":, "The resultsleader epoch forof eachthe topicshare-partition.", "fields": [ }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
 "name "apiKey": NN,
  "type": "TopicNameresponse",
  "typename": "stringReadShareGroupStateResponse",
  "versionsvalidVersions": "0+",
  "entityTypeflexibleVersions": "topicName0+",
  // - NOT_COORDINATOR (version 0+)    "about": "The topic name." },
  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "TopicIdResults", "type": "uuid[]ReadStateResult", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versions": "0+", "read results", "fields": [
        { "name": "PartitionIndexTopicId", "type": "int32uuid", "versions": "0+",
          "about": "The partitiontopic index.identifier" },
        { "name": "StartOffsetPartitions", "type": "int64[]PartitionResult", "versions": "0+",
          "about": "The results share-partitionfor startthe offsetpartitions."}, "fields": [
        { "name": "ErrorCodePartition", "type": "int16int32", "versions": "0+",
          "about": "The error code, or 0 if there was no errorpartition index." },
        { "name": "ErrorMessageErrorCode", "type": "stringint16", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

InitializeShareGroupStateAPI

The InitializeShareGroupState API is used by the group coordinator to initialize the share-partition state. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "InitializeShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
         "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "GroupIdStateEpoch", "type": "stringint32", "versions": "0+",
          "about": "The group identifier state epoch of the share-partition." },
        { "name": "TopicsStartOffset", "type": "[]InitializeStateDataint64", "versions": "0+",
          "about": "The data for the topics.", "fields": [
share-partition start offset, which can be -1 if it is not yet initialized." },
        { "name": "TopicIdStateBatches", "type": "uuid[]StateBatch", "versions": "0+",
        "aboutfields":[
 "The topic identifier." },
      { "name": "PartitionsFirstOffset", "type": "[]PartitionDataint64", "versions": "0+",
            "about":  "The first offset dataof forthis thestate partitionsbatch." },
 "fields": [
        { "name": "PartitionLastOffset", "type": "int32int64", "versions": "0+",
            "about": "The partition index last offset of this state batch." },
          { "name": "StateEpochDeliveryState", "type": "int32int8", "versions": "0+",
            "about": "The delivery state epoch of the share-partition- 0:Available,2:Acked,4:Archived." },
          { "name": "StartOffsetDeliveryCount", "type": "int64int16", "versions": "0+",
            "about": "The share-partition start offset, or -1 if the start offset is not being initialized." delivery count." }
        ]}
      ]}
    ]}
  ]
}

...

WriteShareGroupState API

The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "responserequest",
  "name": "InitializeShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]InitializeStateResult", "versionslisteners": ["broker"],
  "name": "WriteShareGroupStateRequest",
  "validVersions": "0+",
      "aboutflexibleVersions": "The initialization results0+",
  "fields": [
      { "name": "TopicIdGroupId", "type": "uuidstring", "versions": "0+",
        "about": "The topicgroup identifier." },
      { "name": "PartitionsTopics", "type": "[]PartitionResultWriteStateData", "versions": "0+",
        "about" : "The resultsdata for the partitionstopics.", "fields": [
        { "name": "PartitionTopicId", "type": "int32uuid", "versions": "0+",
          "about": "The partitiontopic indexidentifier." },
        { "name": "ErrorCodePartitions", "type": "int16[]PartitionData", "versions": "0+",
          "about": "The errordata code,for or 0 if there was no error." },the partitions.", "fields": [
        { "name": "ErrorMessagePartition", "type": "stringint32", "versions": "0+",
          "nullableVersionsabout": "0+", "default": "null"The partition index." },
         { "aboutname": "StateEpoch"The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

ReadShareGroupState API

The ReadShareGroupState API is used by share-partition leaders to read share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
, "type": "int32", "versions": "0+",
          "about": "The state epoch of the share-partition." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The leader epoch of the share-partition." },
        { "name": "GroupIdStartOffset", "type": "stringint64", "versions": "0+",
          "about": "The group identifier share-partition start offset, or -1 if the start offset is not being written." },
        { "name": "TopicsStateBatches", "type": "[]ReadStateDataStateBatch", "versions": "0+",
      "aboutfields": "The data for the topics.", "fields": [
[
          { "name": "TopicIdFirstOffset", "type": "uuidint64", "versions": "0+",
            "about": "The topic identifierfirst offset of this state batch." },
          { "name": "PartitionsLastOffset", "type": "[]PartitionDataint64", "versions": "0+",
            "about":  "The last offset dataof forthis thestate partitionsbatch." },
 "fields": [
        { "name": "PartitionDeliveryState", "type": "int32int8", "versions": "0+",
            "about": "The delivery state partition index.- 0:Available,2:Acked,4:Archived" },
          { "name": "LeaderEpochDeliveryCount", "type": "int32int16", "versions": "0+",
            "about",: "The leader epoch of the share-partition."  delivery count." }
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupStateResponseWriteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadStateResultWriteStateResult", "versions": "0+",
      "about": "The readwrite results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "StateEpoch", ]}
    ]}
  ]
}

DeleteShareGroupState API

The DeleteShareGroupState API is used by the group coordinator to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "int32request",
  "versionslisteners": ["0+broker"],
  "name": "DeleteShareGroupStateRequest",
  "validVersions": "0",
    "aboutflexibleVersions": "The state epoch of the share-partition." }0+",
    "fields": [
    { "name": "StartOffsetGroupId", "type": "int64string", "versions": "0+",
          "about": "The share-partition start offset, which can be -1 if it is not yet initializedgroup identifier." },
        { "name": "StateBatchesTopics", "type": "[]StateBatchDeleteStateData", "versions": "0+",
      "fieldsabout":[
     "The data for the topics.", "fields": [
      { "name": "FirstOffsetTopicId", "type": "int64uuid", "versions": "0+",
            "about": "The first offset of this state batchtopic identifier." },
          { "name": "LastOffsetPartitions", "type": "int64[]PartitionData", "versions": "0+",
            "about": "The lastdata offsetfor of this state batchthe partitions." },
 "fields": [
        { "name": "DeliveryStatePartition", "type": "int8int32", "versions": "0+",
            "about": "The delivery state - 0:Available,2:Acked,4:Archivedpartition index." },
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeliveryCountDeleteShareGroupStateResponse",
  "typevalidVersions": "int160",
  "versionsflexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  //  "about": "The delivery count." }
        ]}
      ]}
    ]}
  ]
}

WriteShareGroupState API

The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "WriteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
 - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]DeleteStateResult", "versions": "0+",
      "about": "The delete results", "fields": [
      { "name": "GroupIdTopicId", "type": "stringuuid", "versions": "0+",
        "about": "The grouptopic identifier." },
      { "name": "TopicsPartitions", "type": "[]WriteStateDataPartitionResult", "versions": "0+",
        "about": "The dataresults for the topicspartitions.", "fields": [
        { "name": "TopicIdPartition", "type": "uuidint32", "versions": "0+",
          "about": "The topicpartition identifierindex." },
        { "name": "PartitionsErrorCode", "type": "[]PartitionDataint16", "versions": "0+",
          "about":  "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

ReadShareGroupStateSummary API

The ReadShareGroupStateSummary API is used by the group coordinator to read a summary of the share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
   data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32request",
  "versionslisteners": ["0+broker"],
          "aboutname": "The partition index." }ReadShareGroupStateSummaryRequest",
        { "name"validVersions": "StateEpoch0",
 "type": "int32", "versionsflexibleVersions": "0+",
          "about"fields": "The state epoch of the share-partition." },
    [
    { "name": "LeaderEpochGroupId", "type": "int32string", "versions": "0+",
          "about": "The leader epoch of the share-partitiongroup identifier." },
        { "name": "StartOffsetTopics", "type": "int64[]ReadStateSummaryData", "versions": "0+",
          "about": "The share-partition start offset, or -1 if the start offset is not being written." },
        { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+"data for the topics.", "fields": [
          { "name": "FirstOffsetTopicId", "type": "int64uuid", "versions": "0+",
            "about": "The first offset of this state batchtopic identifier." },
          { "name": "LastOffsetPartitions", "type": "int64[]PartitionData", "versions": "0+",
            "about": "The lastdata offsetfor of this state batchthe partitions." },, "fields": [
          { "name": "DeliveryStatePartition", "type": "int8int32", "versions": "0+",
            "about": "The delivery state - 0:Available,2:Acked,4:Archivedpartition index." },
          { "name": "DeliveryCountLeaderEpoch", "type": "int16int32", "versions": "0+",
            "about": "The deliveryleader count." }
        ]epoch of the share-partition." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "WriteShareGroupStateResponseReadShareGroupStateSummaryResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]WriteStateResultReadStateSummaryResult", "versions": "0+",
      "about": "The writeread results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch of the share-partition." },
        { "name": "ErrorMessageStartOffset", "type": "stringint64", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." share-partition start offset." }
      ]}
      ]}
    ]}
  ]
}

DeleteShareGroupState API

The DeleteShareGroupState API is used by the group coordinator to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

...

]
}

Records

This section describes the new record types.

Group and assignment metadata

In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets  topic.

With the exception of the ShareGroupStatePartitionMetadata record, all of the record types are analogous to those introduced in KIP-848 for consumer groups.

For each share group, a ShareGroupMetadata record is written for the group epoch, and a ShareGroupPartitionMetadata record is written for the topic-partitions being assigned by the group. When the group is deleted, a tombstone records are written.

For each member, there is a ShareGroupMemberMetadata record which enables group membership to continue seamlessly across a group coordinator change. When the member leaves, a tombstone record is written.

For each share group, a ShareGroupTargetAssignmentMetadata record is written to record the group epoch used to compute the assignment. For each member, there is a ShareGroupTargetAssignmentMember record which persists the target assignment, and a ShareGroupCurrentMemberAssignment record which persists the current assignment and is also used to keep track of the member epoch.

There is also a ShareGroupStatePartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.

ShareGroupMetadataKey

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"]data",
  "name": "DeleteShareGroupStateRequestShareGroupMetadataKey",
  "validVersions": "011",
  "flexibleVersions": "0+none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "Topics", "type": "[]DeleteStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
  11",
      "about": "The topicgroup identifierid." },
  ]
}

ShareGroupMetadataValue

Code Block
{
    { "nametype": "Partitionsdata",
  "typename": "[]PartitionDataShareGroupMetadataValue",
  "versionsvalidVersions": "0+",
        "about"flexibleVersions":  "The data for the partitions.0+",
  "fields": [
        { "name": "PartitionEpoch", "type": "int32", "versions": "0+",
          "about": "The partitiongroup indexepoch." }
      ]}
    ]}
  ]
}

...

ShareGroupPartitionMetadataKey

Code Block
{
  "apiKey": NN,
  "type": "responsedata",
  "name": "DeleteShareGroupStateResponseShareGroupPartitionMetadataKey",
  "validVersions": "09",
  "flexibleVersions": "0+none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "9",
      "about": "The group id." }
  ]
}

ShareGroupPartitionMetadataValue

Code Block
{
  "type": "data",
  "name": "ShareGroupPartitionMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",// - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ResultsTopics", "typeversions": "[]DeleteStateResult0+", "versionstype": "0+[]TopicMetadata",
      "about": "The list of deletetopic resultsmetadata.", "fields": [
      { "name": "TopicId", "typeversions": "uuid0+", "versionstype": "0+uuid",
        "about": "The topic identifierid." },
      { "name": "PartitionsTopicName", "typeversions": "[]PartitionResult0+", "versionstype": "0+string",
        "about" : "The results for the partitionstopic name.", "fields": [
   },
      { "name": "PartitionNumPartitions", "typeversions": "int320+", "versionstype": "0+int32",
          "about": "The partition index number of partitions of the topic." },
        { "name": "ErrorCodePartitionMetadata", "typeversions": "int160+", "versionstype": "0+[]PartitionMetadata",
          "about": "The error code, or 0 if there was no error." },Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [
        { "name": "Partition", "ErrorMessageversions": "0+", "type": "string", "versions"int32",
          "about": "The partition number." },
        { "name": "0+Racks", "nullableVersionsversions": "0+", "defaulttype": "null[]string",
          "about": "The errorset message,of orracks nullthat ifthe therepartition wasis nomapped errorto." }
        ]}
    ]}
  ]
}

ReadShareGroupStateSummary API

The ReadShareGroupStateSummary API is used by the group coordinator to read a summary of the share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

ShareGroupMemberMetadataKey

Code Block
{
  "apiKey": NN,
  "type": "requestdata",
  "listeners": ["broker"],
  "name": "ReadShareGroupStateSummaryRequestShareGroupMemberMetadataKey",
  "validVersions": "010",
  "flexibleVersions": "0+none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+10",
      "about": "The group identifierid." },
    { "name": "TopicsMemberId", "type": "[]ReadStateSummaryDatastring", "versions": "0+10",
      "about": "The member id." }
  ]
}

ShareGroupMemberMetadataValue

Code Block
{
  "type": "data for the topics.",",
  "name": "ShareGroupMemberMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "TopicIdRackId", "typeversions": "uuid0+", "versionsnullableVersions": "0+",
 "type": "string",
      "about": "The (optional) topicrack identifierid." },
      { "name": "PartitionsClientId", "typeversions": "[]PartitionData0+", "versionstype": "0+string",
        "about":  "The data for the partitions.", "fieldsabout": [
"The client   id." },
    { "name": "PartitionClientHost", "typeversions": "int320+", "versionstype": "0+string",
          "about": "The partitionclient indexhost." },
        { "name": "LeaderEpochSubscribedTopicNames", "typeversions": "int320+", "versionstype": "0+[]string",
          "about": "The leaderlist epochof ofsubscribed thetopic share-partitionnames." }
      ]}
    ]}
  ]
}

...

ShareGroupTargetAssignmentMetadataKey

Code Block
{
  "apiKey": NN,
  "type": "responsedata",
  "name": "ReadShareGroupStateSummaryResponseShareGroupTargetAssignmentMetadataKey",
  "validVersions": "012",
  "flexibleVersions": "0+none",
  // - NOT_COORDINATOR (version 0+)  "fields": [
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadStateSummaryResult", "versions": "0+",
      "about": "The read results",{ "name": "GroupId", "type": "string", "versions": "12",
      "about": "The group id." }
  ]
}

ShareGroupTargerAssignmentMetadataValue

Code Block
{
  "type": "data",
  "name": "ShareGroupTargetAssignmentMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "TopicIdAssignmentEpoch", "typeversions": "uuid0+", "versionstype": "0+int32",
        "about": "The topicassignment identifierepoch." },
  ]
}

ShareGroupTargetAssignmentMemberKey

Code Block
{
  "type    { "name": "Partitionsdata",
  "typename": "[]PartitionResultShareGroupTargetAssignmentMemberKey",
  "versionsvalidVersions": "0+13",
        "aboutflexibleVersions" : "The results for the partitions.none",
  "fields": [
        { "name": "PartitionGroupId", "type": "int32string", "versions": "0+13",
          "about": "The partitiongroup indexid." },
        { "name": "ErrorCodeMemberId", "type": "int16string", "versions": "0+13",
          "about": "The error code, or 0 if there was no errormember id." },
        { "name": "ErrorMessage",]
}

ShareGroupTargetAssignmentMemberValue

Code Block
{
  "type": "stringdata",
  "versionsname": "0+ShareGroupTargetAssignmentMemberValue",
  "nullableVersionsvalidVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
       flexibleVersions": "0+",
  "fields": [
    { "name": "StateEpochTopicPartitions", "typeversions": "int320+", "versionstype": "0+[]TopicPartition",
          "about": "The state epoch of the share-partitionassigned partitions." },
 "fields": [
      { "name": "StartOffsetTopicId", "typeversions": "int640+", "versionstype": "0+uuid" },
      { "name": "Partitions",  "aboutversions": "0+"The share-partition start offset." }
      ], "type": "[]int32" }
    ]}
  ]
}

Records

This section describes the new record types.

Group metadata

In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets  topic.

For each share group, a single ShareGroupMetadata record is written. When the group is deleted, a tombstone record is written.

For each member, there is a ShareGroupMemberMetadata record which enables group membership to continue seamlessly across a group coordinator change. When the member leaves, a tombstone record is written.

There is also a ShareGroupPartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.

...

}

ShareGroupCurrentMemberAssignmentKey

Code Block
{
  "type": "data",
  "name": "ShareGroupMetadataKeyShareGroupCurrentMemberAssignmentKey",
  "validVersions": "1114",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "1114",
      "about": "The group id." }
  ]
}

ShareGroupMetadataValue

Code Block
{
  "type": "data",
  "name": "ShareGroupMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "EpochMemberId", "type": "int32string", "versions": "0+14",
      "about": "The groupmember epochid." }
  ]
}

...

ShareGroupCurrentMemberAssignmentValue

Code Block
{
  "type": "data",
  "name": "ShareGroupMemberMetadataKeyShareGroupCurrentMemberAssignmentValue",
  "validVersions": "100",
  "flexibleVersions": "none0+",
  "fields": [
    { "name": "GroupIdMemberEpoch", "typeversions": "string0+", "versionstype": "10int32",
      "about": "The group id current member epoch that is expected from the member in the heartbeat request." },
    { "name": "MemberIdPreviousMemberEpoch", "typeversions": "string0+", "versionstype": "10int32",
      "about": "The member idIf the last epoch bump is lost before reaching the member, the member will retry with the previous epoch." },
    ]
}

ShareGroupMemberMetadataValue

Code Block
{
  "typename": "dataState",
  "nameversions": "ShareGroupMemberMetadataValue0+",
  "validVersionstype": "0int8",
      "flexibleVersionsabout": "0+",
  "fields": [The member state. See MemberState for the possible values." },
    { "name": "RackIdAssignedPartitions", "versions": "0+", "nullableVersions": "0+", "type": "string[]TopicPartitions",
      "about": "The (optionalpartitions assigned to (or owned by) rackthis idmember." }
  ],
  "commonStructs": [
    { "name": "ClientIdTopicPartitions", "versions": "0+", "typefields": "string",[
      { "aboutname": "The client id." },
    { "nameTopicId", "type": "ClientHostuuid", "versions": "0+",
 "type": "string",
      "about": "The clienttopic hostId." },
      { "name": "SubscribedTopicNamesPartitions", "versionstype": "0+[]int32", "typeversions": "[]string0+",
        "about": "The partition list of subscribed topic names." Ids." }
    ]}
  ]
}

...

ShareGroupStatePartitionMetadataKey

Code Block
{
  "type": "data",
  "name": "ShareGroupPartitionMetadataKeyShareGroupStatePartitionMetadataKey",
  "validVersions": "915",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "915",
      "about": "The group id." }
  ]
}

...

ShareGroupStatePartitionMetadataValue

Code Block
{
  "type": "data",
  "name": "ShareGroupPartitionMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "InitializedTopics", "versions": "0+", "type": "[]TopicPartitionsInfo",
      "about": "The topics with initialized share-group state." },
    { "name": "DeletingTopics", "versions": "0+", "type": "[]TopicInfo",
      "about": "The topics whose share-group state is being deleted." }
  ],
  "commonStructs": [
    { "name": "TopicPartitionsInfo", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "TopicName", "type": "string", "versions": "0+",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
    ]},
    { "name": "TopicInfo", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "TopicName", "type": "string", "versions": "0+",
        "about": "The topic name." }
    ]}
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareSnapshotKey",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0",
      "about": "The group id." },
    { "name": "TopicId", "type": "uuid", "versions": "0",
      "about": "The topic id." },
    { "name": "Partition", "type": "int32", "versions": "0",
      "about": "The partition index." }
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareUpdateKey",
  "validVersions": "1",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "01",
      "about": "The group id." },
    { "name": "TopicId", "type": "uuid", "versions": "01",
      "about": "The topic id." },
    { "name": "Partition", "type": "int32", "versions": "01",
      "about": "The partition index." }
  ]
}

...

Metric Name

Type

Group

Tags

Description

JMX Bean

group-count

Gauge

group-coordinator-metrics

protocol: share

The total number of share groups managed by group coordinator.

kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share 

rebalance (rebalance-rate and rebalance-count)

Meter

group-coordinator-metrics

protocol: share

The total number of share group rebalances count and rate.

kafka.server:type=group-coordinator-metrics,name=rebalance-rate,protocol=share 

kafka.server:type=group-coordinator-metrics,name=rebalance-count,protocol=share 

num-partitions

Gauge

group-coordinator-metrics

protocol: share

The number of share partitions managed by group coordinator. 

kafka.server:type=group-coordinator-metrics,name=num-partitions,protocol=share 

group-countGaugegroup-coordinator-metrics

protocol: share

state: {emptyEmpty|stableStable|deadDead} 

The number of share groups in respective state.kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={emptyEmpty|stableStable|deadDead} 

share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count)

Meter

share-group-metrics


The total number of offsets acknowledged for share groupsacknowledgement requests.

kafka.server:type=share-group-coordinator-metrics,name=share-acknowledgement-rate,protocol=share

kafka.server:type=share-group-coordinator-metrics,name=share-acknowledgement-count,protocol=share 

record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count)

Meter

share-group-metrics

ack-type:{acceptAccept,releaseRelease,rejectReject} 


The number of records acknowledged per acknowledgement type.

kafka.server:type=share-group-metrics,name=record-acknowledgement-rate,protocol=share,ack-type={acceptAccept,releaseRelease,rejectReject} 

kafka.server:type=share-group-metrics,name=record-acknowledgement-count,protocol=share,ack-type={acceptAccept,releaseRelease,rejectReject} 

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-group-metrics


The time taken to load the share partitions.

kafka.server:type=share-group-metrics,name=partition-load-time-avg,protocol=share 

kafka.server:type=share-group-metrics,name=partition-load-time-max,protocol=share  

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-coordinator-metrics


The time taken in milliseconds to load the share-group state from the share-group state partitions.

kafka.server:type=share-coordinator-metrics,name=partition-load-time-avg 

kafka.server:type=share-coordinator-metrcs,name=partition-load-time-max 

thread-idle-ratio (thread-idle-ratio-min and thread-idle-ratio-avg)

Meter

share-coordinator-metrics


The fraction of time the share coordinator thread is idle.

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-min 

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-avg 

write (write-rate and write-total)

Meter

share-coordinator-metrics


The number of share-group state write calls per second.

kafka.server:type=share-coordinator-metrics,name=write-rate 

kafka.server:type=share-coordinator-metrics,name=write-total 

write-latency (write-latency-avg and write-latency-totalmax)

Meter

share-coordinator-metrics


The time taken for a share-group state write call, including the time to write to the share-group state topic.

kafka.server:type=share-coordinator-metrics,name=write-latency-avg 

kafka.server:type=share-coordinator-metrics,name=write-latency-max 

num-partitions

Gauge

share-coordinator-metrics


The number of partitions in the share-state topic.

kafka.server:type=share-coordinator-metrics,name=num-partitions 

...