...
Whenever the group epoch is larger than the target assignment epoch, the group coordinator triggers the computation of a new target assignment based on the latest group metadata using a server-side assignor. For a share group, the group coordinator does not persist the assignment. The assignment epoch becomes the group epoch of the group metadata used to compute the assignment.
...
For a share group, the group coordinator persists three seven kinds of records:
- ShareGroupMetadata - this is written to reserve the group's ID as a share group in the namespace of groups.
- ShareGroupMemberMetadata - this is written to allow group membership to continue seamlessly across a group coordinator change.
- ShareGroupPartitionMetadata - this is written to persist the metadata about the partitions the group is assigning to the members.
- ShareGroupTargetAssignmentMetadata - this is written to persist the assignment epoch.
- ShareGroupTargetAssignmentMember - this is written to persist the target assignment for a member.
- ShareGroupCurrentMemberAssignment - this is written to persist the current assignment for a member, and also to maintain the sequence of member epochs.
- ShareGroupStatePartitionMetadata - this is written whenever the set of topic-partitions being consumed in the share group changes. Its purpose is to keep track of which topic-partitions will have share-group persistent state.
When the group coordinator fails over, the newly elected coordinator replays the state from the __consumer_offsets
partition. This means a share group will remain in existence across the fail-over, along with the list of members . However, the assignments are not persisted and will be recalculated by the new group coordinatorand their assignments. It will bump the group epoch as a result.
...
The group coordinator is responsible for asking the share coordinator to initialize and delete the durable share-partition state. The group coordinator uses the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to keep track of which share-partitions have been initialized.
...
- When a topic is added to the set of subscribed topics for a share group and is not yet in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record
- When partitions are added to a topic which is already in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record
- When
KafkaAdmin.alterShareGroupOffsets
is used to reset the SPSO for a share-partition
The state is deleted in the following cases:
- When a topic in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record is deleted
- When
KafkaAdmin.deleteShareGroupOffsets
is used to delete state for a share-partition, most likely as a result of an administrator cleaning up a topic which is no longer in use by this share group - When the share group is deleted
The group coordinator periodically reconciles its "hard" state in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata with the "soft" state in the cluster metadata. This is how it observes relevant changes to topics, such as topic deletion. The ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata contains the set of topic-partitions which are known to be initialized.
...
- Sends the
InitializeShareGroupState
RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover. - When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic added.
...
- Sends the
InitializeShareGroupState
RPC to the share coordinators for all of the new partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover. - When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the array of partitions for the topic increased.
...
- If the topic still exists, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic added to the
DeletingTopics
array. This enables group coordinator failover to continue the deletion. If the topic does not exist, the requirement to continue deletion can be inferred by the non-existence of the topic. - Sends the
DeleteShareGroupState
RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover. - When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic removed.
...
Operation | Involves | Notes |
---|---|---|
Create share group | Group coordinator | This occurs as a side-effect of the initial a) The group coordinator serves the b) For each share-partition being initialized, the group coordinator sends an c) The share coordinator serves the d) Back in the group coordinator, it writes a ShareGroupPartitionMetadata ShareGroupPartitionStateMetadata record on the |
Assign a share-partition | Group coordinator and optionally share coordinator | When a topic-partition is eligible to be assigned to a member of a share group for the first time, the group coordinator sends an InitializeShareGroupState request to the share coordinator. The share coordinator writes a ShareSnapshot record to the __share_group_state topic and responds to the group coordinator. The group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record, and the share-partition is now able to be included in an assignment in the share group. |
List share groups | Group coordinator | |
Describe share group | Group coordinator | |
List share group offsets | Group coordinator and share coordinator | The admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupStateSummary request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required. |
Alter share group offsets | Group coordinator and share coordinator | Only empty share groups support this operation. The group coordinator bumps the group epoch, writes a ShareGroupMetadata, and sends an InitializeShareGroupState request to the share coordinator. The share coordinator writes a ShareSnapshot record with the new state epoch to the __share_group_state topic. If the partition was not previously initialized, the group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record. |
Delete share group offsets | Group coordinator and share coordinator | This is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the __consumer_offsets topic adding the topic to the DeletingTopics array and removing it from the InitializedTopics array. This enables an interrupted deletion to be completed. The group coordinator sends a DeleteShareGroupState request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the __consumer_offsets topic to complete the deletion of the topic from the share group. |
Delete share group | Group coordinator and share coordinator | Only empty share groups can be deleted. If the share group has any topics with initialized state, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the |
...
InvalidRecordStateException
- The record state is invalid. The acknowledgement of delivery could not be completed.ShareSessionNotFoundException
- The share session is not found.InvalidShareSessionEpochException
- The share session epoch is invalid.FencedStateEpochException
- The share coordinator rejected the request because the share-group state epoch did not match.
They are all subclasses of RetriableException
.
...
Option | Description |
---|---|
--all-topics | Consider all topics assigned to a group in the `reset-offsets` process. |
--bootstrap-server <String: server to connect to> | REQUIRED: The server(s) to connect to. |
--command-config <String: command config property file> | Property file containing configs to be passed to Admin Client. |
--delete | Pass in groups to delete topic partition offsets over the entire Delete share group. For instance --group g1 --group g2 |
--delete-offsets | Delete offsets of share group. Supports one share group at the time, and multiple topics. |
--describe | Describe share group, members and list offset lag (number of records not yet processed) related to given groupoffset information. |
--dry-run | Only show results without executing changes on share groups. Supported operations: reset-offsets. |
--execute | Execute operation. Supported operations: reset-offsets. |
--group <String: share group> | The share group we wish to act on. |
--help | Print usage information. |
--list | List all share groups. |
--members | Describe members of the group. This option may be used with the '--describe' option only. |
--offsets | Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only. |
--reset-offsets | Reset offsets of share group. Supports one share group at a time, and instances must be inactive. If neither Has 2 execution options, '--dry-run' nor (the default) and '–execute' is specified,'. Has 3 reset options: '--to-earliest', '--to-latest' and '--to-datetime'. |
--state [String] | When specified with '--describe', includes the state of the group. When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. The valid values are 'Empty', 'Stable' and 'Dead'. |
--timeout <Long: timeout (ms)> | The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000). (default: 5000) |
--to-datetime <String: datetime> | Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'. |
--to-earliest | Reset offsets to earliest offset. |
--to-latest | Reset offsets to latest offset. |
--topic <String: topic> | The topic whose share group information should be deleted or included in the reset offset process. In `reset-offsets` caseWhen resetting offsets, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partitions to be included. |
--version | Display Kafka version. |
...
Configuration | Description | Values |
---|---|---|
group.coordinator.rebalance.protocols | The list of enabled rebalance protocols. (Existing configuration) |
This will be added to the default value of this configuration property once this feature is complete. |
group.share.delivery.count.limit | The maximum number of delivery attempts for a record delivered to a share group. | Default 5, minimum 2, maximum 10 |
group.share.record.lock.duration.ms | Share-group record acquisition lock duration in milliseconds. | Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds) |
group.share.min.record.lock.duration.ms | Share-group record acquisition lock minimum duration in milliseconds. | Default 15000 (15 seconds), minimum 1000 (1 second), maximum 30000 (30 seconds) |
group.share.max.record.lock.duration.ms | Share-group record acquisition lock maximum duration in milliseconds. | Default 60000 (60 seconds), minimum 30000 (30 seconds), maximum 3600000 (1 hour) |
group.share.partition.max.record.locks | Share-group record lock limit per share-partition. | Default 200, minimum 100, maximum 10000 |
group.share.session.timeout.ms | The timeout to detect client failures when using the group protocol. | Default 45000 (45 seconds) |
group.share.min.session.timeout.ms | The minimum session timeout. | Default 45000 (45 seconds) |
group.share.max.session.timeout.ms | The maximum session timeout. | Default 60000 (60 seconds) |
group.share.heartbeat.interval.ms | The heartbeat interval given to the members. | Default 5000 (5 seconds) |
group.share.min.heartbeat.interval.ms | The minimum heartbeat interval. | Default 5000 (5 seconds) |
group.share.max.heartbeat.interval.ms | The maximum heartbeat interval. | Default 15000 (15 seconds) |
group.share.max.groups | The maximum number of share groups. | Default 10, minimum 1, maximum 100 |
group.share.max.size | The maximum number of consumers that a single share group can accommodate. | Default 200, minimum 10, maximum 1000 |
group.share.assignors | The server-side assignors as a list of full class names. The list must contain only a single entry which is used by all groups. In the future, it is envisaged that a group configuration will be provided to allow each group to choose one of the list of assignors. | A list of class names, currently limited to a single entry. Default "org.apache.kafka.coordinator.group.share.SimpleAssignor" |
share.coordinator.state.topic.num.partitions | The number of partitions for the share-group state topic (should not change after deployment). | Default 50 |
share.coordinator.state.topic.replication.factor | The replication factor for the share-group state topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement. | Default 3 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use) |
share.coordinator.state.topic.segment.bytes | The log segment size for the share-group state topic. | Default 104857600 |
share.coordinator.state.topic.min.isr | Overridden min.insync.replicas for the share-group state topic. | Default 2 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use) |
share.coordinator.state.topic.compression.codec | Compression codec for the share-group state topic. | Default 0 (NONE) |
share.coordinator.append.linger.ms | The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk. | Default 10, minimum 0 |
share.coordinator.load.buffer.size | Batch size for reading from the share-group state topic when loading state information into the cache (soft-limit, overridden if records are too large). | Default 5 * 1024 * 1024 (5242880), minimum 1 |
share.coordinator.snapshot.update.records.per.snapshot | The number of update records the share coordinator writes between snapshot records. | Default 500, minimum 0 |
share.coordinator.threads | The number of threads used by the share coordinator. | Default 1, minimum 1 |
share.coordinator.write.timeout.ms | The duration in milliseconds that the share coordinator will wait for all replicas of the share-group state topic to receive a write. | Default 5000, minimum 1 |
share.fetch.purgatory.purge.interval.requests | The purge interval (in number of requests) of the share fetch request purgatory | Default 1000 |
Group configuration
The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.
...
FindCoordinator
- for finding coordinators, to support share coordinatorsListGroups
- for listing groups, to support listing share groups
Access controlAccess control
This table gives the ACLs required for the new APIs.
...
This KIP adds the following error codes the Kafka protocol.
INVALID_RECORD_STATE
(121) - The record state is invalid. The acknowledgement of delivery could not be completed.SHARE_SESSION_NOT_FOUND
(122) - The share session is not found.INVALID_SHARE_SESSION_EPOCH
(123) - The share session epoch is invalid.FENCED_STATE_EPOCH
(124) - The share coordinator rejected the request because share-group state epoch did not match.
...
The KIP introduces version 56.
Request schema
Version 5 6 adds the new key type of FindCoordinatorRequest.CoordinatorType.SHARE
with value 2 with the key of "group:topicId:partition"
.
Code Block |
---|
{ "apiKey": 10, "type": "request", "listeners": ["zkBroker", "broker"], "name": "FindCoordinatorRequest", // Version 1 adds KeyType. // // Version 2 is the same as version 1. // // Version 3 is the first flexible version. // // Version 4 adds support for batching via CoordinatorKeys (KIP-699) // // Version 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890). // // Version 6 adds support for share groups (KIP-932). "validVersions": "0-56", "deprecatedVersions": "0", "flexibleVersions": "3+", "fields": [ { "name": "Key", "type": "string", "versions": "0-3", "about": "The coordinator key." }, { "name": "KeyType", "type": "int8", "versions": "1+", "default": "0", "ignorable": false, "about": "The coordinator key type. (Group, transaction, share, etc.)" }, { "name": "CoordinatorKeys", "type": "[]string", "versions": "4+", "about": "The coordinator keys." } ] } |
Response schema
Version 5 6 is the same as version 45.
ListGroups API
This KIP introduces version 6
Request schema
ShareGroupHeartbeat API
The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.
Request schema
The member must set all the top-level fields when it joins for the first time or when an error occurs. Otherwise, it is expected only to fill in the fields which have changed since the last heartbeatVersion 6 adds support for share groups.
Code Block |
---|
{ "apiKey": 1676, "type": "request", "listeners": ["zkBroker", "broker"], "name": "ListGroupsRequestShareGroupHeartbeatRequest", // Version 1 and 2 are the same as version 0. // // Version 3 is the first flexible version. // // Version 4 adds the StatesFilter field (KIP-518). // // Version 5 adds the TypesFilter field (KIP-848). // // Version 6 adds support for share groups. "validVersions": "0-6", "flexibleVersions": "3+", "fields": ["validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group identifier." }, { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member ID generated by the coordinator. The member ID must be kept during the entire lifetime of the member." }, { "name": "StatesFilterMemberEpoch", "type": "[]stringint32", "versions": "40+", "about": "The statescurrent ofmember the groups we wantepoch; 0 to list.join Ifthe empty, all groups are returned with their stategroup; -1 to leave the group." }, { "name": "TypesFilterRackId", "type": "[]string", "versions": "0+", "nullableVersions": "50+", "default": "null", "about": "Thenull typesif ofnot theprovided groupsor weif wantit to list. If empty, all groups are returned with their typedidn't change since the last heartbeat; the rack ID of consumer otherwise." }, { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." } ] } |
Response schema
Version 6 is the same as version 5.
ShareGroupHeartbeat API
The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.
Request schema
The member must set all the top-level fields when it joins for the first time or when an error occurs. Otherwise, it is expected only to fill in the fields which have changed since the last heartbeat.coordinator will only send the Assignment field when it changes.
Code Block |
---|
{ "apiKey": TBD76, "type": "request", "listeners": ["broker"]response", "name": "ShareGroupHeartbeatRequestShareGroupHeartbeatResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group identifier." },// Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_MEMBER_ID (version 0+) // - GROUP_MAX_SIZE_REACHED (version 0+) "fields": [ { "name": "MemberIdThrottleTimeMs", "type": "stringint32", "versions": "0+", "about": "The duration memberin IDmilliseconds generatedfor bywhich the coordinator. The member ID must be kept during the entire lifetime of the member request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "MemberEpochErrorCode", "type": "int32int16", "versions": "0+", "about": "The top-level currenterror membercode, epoch;or 0 toif jointhere thewas group; -1 to leave the group.no error" }, { "name": "RackIdErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "nullThe iftop-level noterror providedmessage, or null if itthere didn'twas change since the last heartbeat; the rack ID of consumer otherwiseno error." }, { "name": "SubscribedTopicNamesMemberId", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "nullThe member ifID itgenerated didn'tby changethe sincecoordinator. theOnly lastprovided heartbeat;when the subscribed topic names otherwisemember joins with MemberEpoch == 0." }, ]} ] } |
Response schema
The group coordinator will only send the Assignment field when it changes.
Code Block |
---|
{ "apiKey": TBD, "type": "response", { "name": "MemberEpoch", "type": "int32", "versions": "0+", "about": "The member epoch." }, { "name": "ShareGroupHeartbeatResponseHeartbeatIntervalMs", "validVersionstype": "0int32", "flexibleVersionsversions": "0+", // Supported errors:"about": "The heartbeat interval in milliseconds." }, // - GROUP_AUTHORIZATION_FAILED (version 0+) // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_MEMBER_ID (version 0+) // - GROUP_MAX_SIZE_REACHED (version 0+) { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not provided; the assignment otherwise.", "fields": [ { "name": "ThrottleTimeMsTopicPartitions", "type": "int32[]TopicPartitions", "versions": "0+", "about": "The durationpartitions in milliseconds for whichassigned to the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, member." } ]} ], "commonStructs": [ { "name": "TopicPartitions", "versions": "0+", "fields": [ { "name": "ErrorCodeTopicId", "type": "int16uuid", "versions": "0+", "about": "The top-level error code, or 0 if there was no error" }, topic ID." }, { "name": "ErrorMessagePartitions", "type": "string[]int32", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The top-level error message, or null if there was no error." }, { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersionspartitions." } ]} ] } |
ShareGroupDescribe API
The ShareGroupDescribe API is used to describe share groups.
Request schema
Code Block |
---|
{ "apiKey": 77, "type": "request", "listeners": ["broker"], "name": "ShareGroupDescribeRequest", "validVersions": "0+", "defaultflexibleVersions": "null0+", "aboutfields": "The member ID generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." },[ { "name": "MemberEpochGroupIds", "type": "int32[]string", "versions": "0+", "entityType": "groupId", "about": "The member epoch. ids of the groups to describe" }, { "name": "HeartbeatIntervalMsIncludeAuthorizedOperations", "type": "int32bool", "versions": "0+", "about": "TheWhether heartbeatto intervalinclude inauthorized millisecondsoperations." }, ] } |
Response schema
Code Block |
---|
{ { "nameapiKey": "Assignment"77, "type": "Assignmentresponse", "versionsname": "0+ShareGroupDescribeResponse", "nullableVersionsvalidVersions": "0+", "defaultflexibleVersions": "null0+", // Supported "about"errors: "null if// not provided; the assignment otherwise.", "fields": [- GROUP_AUTHORIZATION_FAILED (version 0+) // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - INVALID_REQUEST (version 0+) // - INVALID_GROUP_ID (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) "fields": [ { "name": "TopicPartitionsThrottleTimeMs", "type": "[]TopicPartitionsint32", "versions": "0+", "about": "The duration in partitionsmilliseconds assignedfor towhich the member." } ]} ], "commonStructs": [request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Groups", "type": "TopicPartitions[]DescribedGroup", "versions": "0+", "about": "Each described group.", "fields": [ { "name": "TopicIdErrorCode", "type": "uuidint16", "versions": "0+", "about": "The topic ID describe error, or 0 if there was no error." }, { "name": "PartitionsErrorMessage", "type": "[]int32string", "versions": "0+", "aboutnullableVersions": "The partitions." } 0+", "default": "null", ]} ] } |
ShareGroupDescribe API
The ShareGroupDescribe API is used to describe share groups.
Request schema
Code Block |
---|
{ "apiKeyabout": NN, "type "The top-level error message, or null if there was no error." }, { "name": "requestGroupId", "listenerstype": ["brokerstring"], "nameversions": "ShareGroupDescribeRequest0+", "validVersionsentityType": "0groupId", "flexibleVersionsabout": "0+"The group ID string." }, "fields": [ { "name": "GroupIdsGroupState", "type": "[]string", "versions": "0+", "entityType": "groupId", "about": "The ids of group state string, or the groups to describeempty string." }, { "name": "IncludeAuthorizedOperationsGroupEpoch", "type": "boolint32", "versions": "0+", "about": "WhetherThe to include authorized operationsgroup epoch." }, ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, { "typename": "responseAssignmentEpoch", "nametype": "ShareGroupDescribeResponseint32", "validVersionsversions": "0+", "flexibleVersions "about": "0+"The assignment epoch." }, // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - INVALID_REQUEST (version 0+) // - INVALID_GROUP_ID (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) "fields": [ { "name": "AssignorName", "type": "string", "versions": "0+", "about": "The selected assignor." }, { "name": "ThrottleTimeMsMembers", "type": "int32[]Member", "versions": "0+", "about": "The durationmembers.", in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, "fields": [ { "name": "GroupsMemberId", "type": "[]DescribedGroupstring", "versions": "0+", "about": "EachThe describedmember groupID." }, "fields": [ { "name": "ErrorCodeRackId", "type": "int16string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The describemember error, or 0 if there was no errorrack ID." }, { "name": "ErrorMessageMemberEpoch", "type": "stringint32", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The top-levelcurrent error message, or null if there was no errormember epoch." }, { "name": "GroupIdClientId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The groupclient ID string." }, { "name": "GroupStateClientHost", "type": "string", "versions": "0+", "about": "The groupclient statehost." string}, or the empty string." }, { "name": "GroupEpochSubscribedTopicNames", "type": "int32[]string", "versions": "0+", "entityType": "topicName", "about": "The subscribed grouptopic epochnames." }, { "name": "AssignmentEpochAssignment", "type": "int32Assignment", "versions": "0+", "about": "The current assignment epoch." } ]}, { "name": "AssignorNameAuthorizedOperations", "type": "stringint32", "versions": "0+", "default": "-2147483648", "about": "The selected assignor32-bit bitfield to represent authorized operations for this group." }, ] } ], { "namecommonStructs": "Members", [ { "typename": "[]MemberTopicPartitions", "versions": "0+", "fields": [ { "aboutname": "The members.", "fields": [ { "name": "MemberId""TopicId", "type": "stringuuid", "versions": "0+", "about": "The membertopic ID." }, { "name": "RackIdTopicName", "type": "string", "versions": "0+", "nullableVersionsentityType": "0+topicName", "default": "null", "about": "The membertopic rack IDname." }, { "name": "MemberEpochPartitions", "type": "[]int32", "versions": "0+", "about": "The current member epochpartitions." }, ]}, { "name": "ClientIdAssignment", "type": "string", "versions": "0+", "about"fields": "The client ID." }, [ { "name": "ClientHostTopicPartitions", "type": "string[]TopicPartitions", "versions": "0+", "about": "The client host assigned topic-partitions to the member." }, { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName", ]} ] } |
ShareFetch API
The ShareFetch API is used by share group consumers to fetch acquired records from share-partition leaders. It is also possible to piggyback acknowledgements in this request to reduce the number of round trips.
The first request from a share consumer to a share-partition leader broker establishes a share session by setting MemberId
to the member ID it received from the group coordinator and ShareSessionEpoch
to 0. Then each subsequent ShareFetch
or ShareAcknowledge
request specifies the MemberId
and increments the ShareSessionEpoch
by one. When the share consumer wishes to close the share session, it sets MemberId
to the member ID and ShareSessionEpoch
to -1.
When piggybacking acknowledgements in this request, there are a few special cases.
- If acknowledgements are being made for a partition and no records should be fetched,
PartitionMaxBytes
should be set to zero. - If acknowledgements are being made for a partition which is being removed from the share session, the partition is included in the
Topics
array withPartitionMaxBytes
set to zero AND the partition is included inForgottenTopicsData
. - If acknowledgements are being made for a partition in the final request in a share session, the partition is included in the
Topics
array andShareSessionEpoch
is set to -1. No data will be fetched and it is not necessary to include the partition inForgottenTopicsData
. - If there's an error which affects all piggybacked acknowledgements but which does not prevent data from being fetched, the
AcknowledgeErrorCode
in the response will be set to the same value for all partitions which had piggybacked acknowledgements.
Request schema
For the AcknowledgementBatches
of each topic-partition, the FirstOffsets
must be ascending order and the ranges must be non-overlapping.
For each AcknowledgementBatch
, the array of AcknowledgeType
entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.
Code Block |
---|
{ "apiKey": 78, "type": "request", "listeners": ["broker"], "name": "ShareFetchRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId", "about": "The group identifier." }, { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The member ID." }, { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+", "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." }, { "name": "MaxWaitMs", "type": "int32", "versions": "0+", "about": "The maximum time in milliseconds to wait for the response." }, { "name": "MinBytes", "type": "int32", "versions": "0+", "about": "The minimum bytes to accumulate in the response." }, { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true, "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." }, { "name": "Topics", "type": "[]FetchTopic", "versions": "0+", "about": "The subscribedtopics topicto namesfetch." }, , "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."}, { "name": "AssignmentPartitions", "type": "Assignment[]FetchPartition", "versions": "0+", "about": "The partitions to fetch.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The currentpartition assignmentindex." }, { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+", "about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." ]}, { "name": "AuthorizedOperationsAcknowledgementBatches", "type": "int32[]AcknowledgementBatch", "versions": "0+", "default": "-2147483648", "about": "32-bitRecord bitfieldbatches to represent authorized operations for this groupacknowledge." }, "fields": [ ] } ], { "commonStructsname": [ { "name"FirstOffset", "type": "TopicPartitionsint64", "versions": "0+", "fields "about": [ "First offset of batch of records to acknowledge."}, { "name": "TopicIdLastOffset", "type": "uuidint64", "versions": "0+", "about": "The topic ID." Last offset (inclusive) of batch of records to acknowledge."}, { "name": "TopicNameAcknowledgeTypes", "type": "string[]int8", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."} ]} ]} ]}, { "name": "PartitionsForgottenTopicsData", "type": "[]int32ForgottenTopic", "versions": "0+", "ignorable": false, "about": "The partitions." } to ]}, { "name": "Assignment", "versions": "0+remove from this share session.", "fields": [ { "name": "TopicPartitionsTopicId", "type": "[]TopicPartitionsuuid", "versions": "0+", "ignorable": true, "about": "The assigned topic-partitions to the member." } ]} ] } |
ShareFetch API
The ShareFetch API is used by share group consumers to fetch acquired records from share-partition leaders. It is also possible to piggyback acknowledgements in this request to reduce the number of round trips.
The first request from a share consumer to a share-partition leader broker establishes a share session by setting MemberId
to the member ID it received from the group coordinator and ShareSessionEpoch
to 0. Then each subsequent ShareFetch
or ShareAcknowledge
request specifies the MemberId
and increments the ShareSessionEpoch
by one. When the share consumer wishes to close the share session, it sets MemberId
to the member ID and ShareSessionEpoch
to -1.
When piggybacking acknowledgements in this request, there are a few special cases.
- If acknowledgements are being made for a partition and no records should be fetched,
PartitionMaxBytes
should be set to zero. - If acknowledgements are being made for a partition which is being removed from the share session, the partition is included in the
Topics
array withPartitionMaxBytes
set to zero AND the partition is included inForgottenTopicsData
. - If acknowledgements are being made for a partition in the final request in a share session, the partition is included in the
Topics
array andShareSessionEpoch
is set to -1. No data will be fetched and it is not necessary to include the partition inForgottenTopicsData
. - If there's an error which affects all piggybacked acknowledgements but which does not prevent data from being fetched, the
AcknowledgeErrorCode
in the response will be set to the same value for all partitions which had piggybacked acknowledgements.
Request schema
For the AcknowledgementBatches
of each topic-partition, the FirstOffsets
must be ascending order and the ranges must be non-overlapping.
For each AcknowledgementBatch
, the array of AcknowledgeType
entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.
"The unique topic ID."},
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions indexes to forget." }
]}
]
} |
Response schema
Code Block |
---|
{
"apiKey": 78,
"type": "response",
"name": "ShareFetchResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// Supported errors for ErrorCode and AcknowledgeErrorCode:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - TOPIC_AUTHORIZATION_FAILED (version 0+)
// - SHARE_SESSION_NOT_FOUND (version 0+)
// - INVALID_SHARE_SESSION_EPOCH (version 0+)
// - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
// - NOT_LEADER_OR_FOLLOWER (version 0+)
// - UNKNOWN_TOPIC_ID (version 0+)
// - INVALID_RECORD_STATE (version 0+) - only for AcknowledgeErrorCode
// - KAFKA_STORAGE_ERROR (version 0+)
// - CORRUPT_MESSAGE (version 0+)
// - INVALID_REQUEST (version 0+)
// - UNKNOWN_SERVER_ERROR (version 0+) |
Code Block |
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "ShareFetchRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupIdThrottleTimeMs", "type": "stringint32", "versions": "0+", "nullableVersionsignorable": "0+", "default": "null", "entityType": "groupId", "about": "The group identifiertrue, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "MemberIdErrorCode", "type": "stringint16", "versions": "0+", "nullableVersionsignorable": "0+"true, "about": "The member ID top-level response error code." }, { "name": "ShareSessionEpochErrorMessage", "type": "int32string", "versions": "0+", "nullableVersions": "0+", "aboutdefault": "null"The, current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests"about": "The top-level error message, or null if there was no error." }, { "name": "MaxWaitMsResponses", "type": "int32[]ShareFetchableTopicResponse", "versions": "0+", "about": "The maximum time in milliseconds to wait for the response." }, response topics.", "fields": [ { "name": "MinBytesTopicId", "type": "int32uuid", "versions": "0+", "ignorable": true, "about": "The minimumunique bytes to accumulate in the responsetopic ID." }, { "name": "MaxBytesPartitions", "type": "int32[]PartitionData", "versions": "0+", "default": "0x7fffffff", "ignorable": true, "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." }, topic partitions.", "fields": [ { "name": "TopicsPartitionIndex", "type": "[]FetchTopicint32", "versions": "0+", "about": "The topicspartition to fetchindex." }, "fields": [ { "name": "TopicIdErrorCode", "type": "uuidint16", "versions": "0+", "ignorable": true, "about": "The unique topic ID."fetch error code, or 0 if there was no fetch error." }, { "name": "PartitionsErrorMessage", "type": "[]FetchPartitionstring", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The partitionsfetch toerror fetch."message, "fields": [ or null if there was no fetch error." }, { "name": "PartitionIndexAcknowledgeErrorCode", "type": "int32int16", "versions": "0+", "about": "The partition index": "The acknowledge error code, or 0 if there was no acknowledge error." }, { "name": "PartitionMaxBytesAcknowledgeErrorMessage", "type": "int32string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The maximumacknowledge byteserror tomessage, fetchor fromnull thisif partition. 0 when only acknowledgement withthere was no fetchingacknowledge is required. See KIP-74 for cases where this limit may not be honored.error." }, { "name": "AcknowledgementBatchesCurrentLeader", "type": "[]AcknowledgementBatchLeaderIdAndEpoch", "versions": "0+", "about": "Record batches to acknowledge.", "fields": [ { "name": "FirstOffsetLeaderId", "type": "int64int32", "versions": "0+", "about": "FirstThe offsetID of batch of records to acknowledge." the current leader or -1 if the leader is unknown." }, { "name": "LastOffsetLeaderEpoch", "type": "int64int32", "versions": "0+", "about": "The latest known leader epoch."Last } offset (inclusive) of batch of records to acknowledge."]}, { "name": "AcknowledgeTypesRecords", "type": "[]int8records", "versions": "0+", "about"nullableVersions": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."} ]} ]}0+", "about": "The record data."}, ]}, { "name": "ForgottenTopicsDataAcquiredRecords", "type": "[]ForgottenTopicAcquiredRecords", "versions": "0+", "ignorableabout": false, "The acquired records.", "fields": [ {"name": "FirstOffset", "type": "int64", "versions": "0+", "about": "The partitionsearliest offset toin removethis frombatch thisof shareacquired sessionrecords."}, "fields": [ { "name": "TopicIdLastOffset", "type": "uuidint64", "versions": "0+", "ignorable": true, "about": "The unique topic ID last offset of this batch of acquired records."}, { "name": "PartitionsDeliveryCount", "type": "[]int32int16", "versions": "0+", "about": "The partitions indexes to forget." } delivery count of this batch of acquired records."} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN ]}, "type": "response", { "name": "ShareFetchResponseNodeEndpoints", "validVersionstype": "0[]NodeEndpoint", "flexibleVersionsversions": "0+", // Supported errors "about": "Endpoints for all ErrorCodecurrent leaders andenumerated AcknowledgeErrorCode: in PartitionData //with -error GROUPNOT_LEADER_AUTHORIZATION_FAILED (version 0+)OR_FOLLOWER.", "fields": [ // - TOPIC_AUTHORIZATION_FAILED (version 0+) // - SHARE_SESSION_NOT_FOUND (version 0+) // - INVALID_SHARE_SESSION_EPOCH (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - NOT_LEADER_OR_FOLLOWER (version 0+) // - UNKNOWN_TOPIC_ID (version 0+) // - INVALID_RECORD_STATE (version 0+) - only for AcknowledgeErrorCode // - KAFKA_STORAGE_ERROR (version 0+) // - CORRUPT_MESSAGE (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_SERVER_ERROR (version 0+) "fields": [ { "name": "NodeId", "type": "int32", "versions": "0+", "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." }, { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname." }, { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port." }, { "name": "ThrottleTimeMsRack", "type": "int32string", "versions": "0+", "ignorable"nullableVersions": "0+", "default": true"null", "about": "The durationrack in milliseconds for which of the requestnode, wasor throttlednull dueif toit ahas quotanot violation,been orassigned zeroto ifa the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true, "about": "The top-level response error code." },rack." } ]} ] } |
ShareAcknowledge API
The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.
Request schema
For the AcknowledgementBatches
of each topic-partition, the FirstOffsets
must be ascending order and the ranges must be non-overlapping.
For each AcknowledgementBatch
, the array of AcknowledgeType
entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.
Code Block |
---|
{ "apiKey": 79, "type": "request", "listeners": ["broker"], "name": "ShareAcknowledgeRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ErrorMessageGroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId", "about": "The top-level error message, or null if there was no errorgroup identifier." }, { "name": "ResponsesMemberId", "type": "[]ShareFetchableTopicResponsestring", "versions": "0+", "nullableVersions": "0+", "about": "The responsemember topicsID.", "fields": [ }, { "name": "TopicIdShareSessionEpoch", "type": "uuidint32", "versions": "0+", "ignorable": true, "about": "The unique topic ID." current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." }, { "name": "PartitionsTopics", "type": "[]PartitionDataAcknowledgeTopic", "versions": "0+", "about": "The topic partitions topics containing records to acknowledge.", "fields": [ { "name": "PartitionIndexTopicId", "type": "int32uuid", "versions": "0+", "about": "The unique partitiontopic indexID." }, { "name": "ErrorCodePartitions", "type": "int16[]AcknowledgePartition", "versions": "0+", "about": "The fetchpartitions errorcontaining code,records or 0 if there was no fetch error." },to acknowledge.", "fields": [ { "name": "ErrorMessagePartitionIndex", "type": "stringint32", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The fetch error message, or null if there was no fetch error"about": "The partition index." }, { "name": "AcknowledgeErrorCodeAcknowledgementBatches", "type": "int16[]AcknowledgementBatch", "versions": "0+", "about": "TheRecord acknowledgebatches errorto codeacknowledge.", or 0 if there was no acknowledge error." }, "fields": [ { "name": "AcknowledgeErrorMessageFirstOffset", "type": "stringint64", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "TheFirst acknowledgeoffset errorof message,batch orof nullrecords if there was no acknowledge error." }, to acknowledge."}, { "name": "CurrentLeaderLastOffset", "type": "LeaderIdAndEpochint64", "versions": "0+", "fields": [ "about": "Last offset (inclusive) of batch of records to acknowledge."}, { "name": "LeaderIdAcknowledgeTypes", "type": "int32[]int8", "versions": "0+", "about": "TheArray IDof ofacknowledge thetypes current leader or -1 if the leader is unknown." }, - 0:Gap,1:Accept,2:Release,3:Reject."} ]} ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": 79, "type": "response", "name": "LeaderEpochShareAcknowledgeResponse", "typevalidVersions": "int320", "versionsflexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) "about": "The latest known leader epoch." } ]}, { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."}, { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields": // - TOPIC_AUTHORIZATION_FAILED (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - SHARE_SESSION_NOT_FOUND (version 0+) // - INVALID_SHARE_SESSION_EPOCH (version 0+) // - NOT_LEADER_OR_FOLLOWER (version 0+) // - UNKNOWN_TOPIC_ID (version 0+) // - INVALID_RECORD_STATE (version 0+) // - KAFKA_STORAGE_ERROR (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_SERVER_ERROR (version 0+) "fields": [ { {"name": "FirstOffsetThrottleTimeMs", "type": "int64int32", "versions": "0+", "ignorable": true, "about": "The earliest offset in this batch of acquired records."}, { duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "LastOffsetErrorCode", "type": "int64int16", "versions": "0+", "aboutignorable": "The last offset of this batch of acquired records."}, true, "about": "The top level response error code." }, { "name": "DeliveryCountErrorMessage", "type": "int16string", "versions": "0+", "aboutnullableVersions": "The delivery count of this batch of acquired records."}0+", "default": "null", "about": ]} ]} ]"The top-level error message, or null if there was no error." }, { "name": "NodeEndpointsResponses", "type": "[]NodeEndpointShareAcknowledgeTopicResponse", "versions": "0+", "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [The response topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."}, { "name": "NodeIdPartitions", "type": "int32[]PartitionData", "versions": "0+", "mapKeyabout": true, "entityType": "brokerId""The topic partitions.", "aboutfields": "The ID of the associated node." }, [ { "name": "HostPartitionIndex", "type": "stringint32", "versions": "0+", "about": "The node'spartition hostnameindex." }, { "name": "PortErrorCode", "type": "int32int16", "versions": "0+", "about": "The node's port error code, or 0 if there was no error." }, { "name": "RackErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The rack of the nodeerror message, or null if itthere haswas not been assigned to a rack." } ]} ] } |
ShareAcknowledge API
The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.
Request schema
For the AcknowledgementBatches
of each topic-partition, the FirstOffsets
must be ascending order and the ranges must be non-overlapping.
For each AcknowledgementBatch
, the array of AcknowledgeType
entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], no error." }, { "name": "ShareAcknowledgeRequestCurrentLeader", "validVersionstype": "0LeaderIdAndEpoch", "flexibleVersionsversions": "0+", "fields": [ { "name": "GroupIdLeaderId", "type": "stringint32", "versions": "0+", "nullableVersions "about": "The ID of the current leader or -1 if the leader is unknown." }, { "name": "0+LeaderEpoch", "defaulttype": "nullint32", "entityTypeversions": "groupId0+", "about": "The latest known groupleader identifierepoch." }, ]} ]} ]}, { "name": "MemberIdNodeEndpoints", "type": "string[]NodeEndpoint", "versions": "0+", "nullableVersionsabout": "0+", "about": "The member ID." }, Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [ { "name": "ShareSessionEpochNodeId", "type": "int32", "versions": "0+", "aboutmapKey": true, "entityType"The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests: "brokerId", "about": "The ID of the associated node." }, { "name": "TopicsHost", "type": "[]AcknowledgeTopicstring", "versions": "0+", "about": "The topics containing records to acknowledgenode's hostname.", "fields": [ }, { "name": "TopicIdPort", "type": "uuidint32", "versions": "0+", "about": "The unique topic IDnode's port." }, { "name": "PartitionsRack", "type": "[]AcknowledgePartitionstring", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The partitions containing records to acknowledge.", "fields": [ rack of the node, or null if it has not been assigned to a rack." } ]} ] } |
AlterShareGroupOffsets API
The AlterShareGroupOffsets API is used to alter the share-partition start offsets for the share-partitions in a share group. The group coordinator serves this API.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "AlterShareGroupOffsetsRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "PartitionIndexGroupId", "type": "string", "versions": "int320+", "versionsentityType": "0+groupId", "about": "The partitiongroup indexidentifier." }, { "name": "AcknowledgementBatchesTopics", "type": "[]AcknowledgementBatchAlterShareGroupOffsetsRequestTopic", "versions": "0+", "about": "RecordThe batchestopics to alter acknowledgeoffsets for.", "fields": [ { "name": "FirstOffsetTopicName", "type": "int64string", "versions": "0+", "entityType": "topicName", "mapKey": true, "about": "FirstThe offset of batch of records to acknowledgetopic name." }, { "name": "LastOffsetPartitions", "type": "int64[]AlterShareGroupOffsetsRequestPartition", "versions": "0+", "about": "LastEach offsetpartition (inclusive)to ofalter batch of records to acknowledgeoffsets for."}, "fields": [ { "name": "AcknowledgeTypesPartitionIndex", "type": "[]int8int32", "versions": "0+", "about": "ArrayThe ofpartition acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."} ]index." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset." } ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "ShareAcknowledgeResponseAlterShareGroupOffsetsResponse", "validVersions": "0", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - TOPIC_AUTHORIZATION_FAILED (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - SHARE_SESSION_NOT_FOUNDNOT_COORDINATOR (version 0+) // - INVALID_SHARECOORDINATOR_SESSIONNOT_EPOCHAVAILABLE (version 0+) // - NOTCOORDINATOR_LEADERLOAD_ORIN_FOLLOWERPROGRESS (version 0+) // - UNKNOWNGROUP_ID_TOPICNOT_IDFOUND (version 0+) // - INVALIDGROUP_RECORDNOT_STATEEMPTY (version 0+) // - KAFKA_STORAGE_ERROR (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_SERVER_ERROR (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCodeResponses", "type": "int16[]AlterShareGroupOffsetsResponseTopic", "versions": "0+", "ignorableabout": true, "The results for each topic.", "fields": [ "about{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "The top level response error code"topicName", "about": "The topic name." }, { "name": "ErrorMessageTopicId", "type": "stringuuid", "versions": "0+", "nullableVersionsignorable": true, "about": "The unique topic ID." }, { "name": "Partitions", "type": "0+[]AlterShareGroupOffsetsResponsePartition", "defaultversions": "null0+", "fields": [ { "aboutname": "PartitionIndex", "The top-level error message, or null if there was no errortype": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ResponsesErrorCode", "type": "[]ShareAcknowledgeTopicResponseint16", "versions": "0+", "about": "The response topics.", "fields": [ error code, or 0 if there was no error." }, { "name": "TopicIdErrorMessage", "type": "uuidstring", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null", "about": "The unique topic ID."},error message, or null if there was no error." } ]} ]} ] } |
DeleteShareGroupOffsets API
The DeleteShareGroupOffsets API is used to delete the offsets for the share-partitions in a share group. The group coordinator serves this API.
Request schema
Code Block |
---|
{ "apiKey": NN, "nametype": "Partitionsrequest", "listeners": "type["broker"], "name": "[]PartitionDataDeleteShareGroupOffsetsRequest", "versionsvalidVersions": "0+", "about "flexibleVersions": "The topic partitions.0+", "fields": [ { "name": "PartitionIndexGroupId", "type": "int32string", "versions": "0+", "entityType": "groupId", "about": "The partitiongroup indexidentifier." }, { "name": "ErrorCodeTopics", "type": "int16[]DeleteShareGroupOffsetsRequestTopic", "versions": "0+", "about": "The error code, or 0 if there was no error." }, topics to delete offsets for.", "fields": [ { "name": "ErrorMessageTopicName", "type": "string", "versions": "0+", "nullableVersionsentityType": "0+", "default": "null"topicName", "about": "The topic name." } ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "DeleteShareGroupOffsetsResponse", "validVersions error message, or null if there was no error." }, { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fieldsflexibleVersions": ["0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED { "name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the current leader or -1 if the leader is unknown." }, (version 0+) // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - GROUP_NOT_EMPTY (version 0+) // - KAFKA_STORAGE_ERROR (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_SERVER_ERROR (version 0+) "fields": [ { "name": "LeaderEpochThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in latestmilliseconds knownfor leaderwhich epoch." } ]} ]} ]the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "NodeEndpointsResponses", "type": "[]NodeEndpointDeleteShareGroupOffsetsResponseTopic", "versions": "0+", "about": "EndpointsThe results for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWEReach topic.", "fields": [ { "name": "NodeIdTopicName", "type": "int32string", "versions": "0+", "entityType": "topicName", "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated nodetopic name." }, { "name": "HostTopicId", "type": "stringuuid", "versions": "0+", "ignorable": true, "about": "The unique node'stopic hostnameID." }, { "name": "PortErrorCode", "type": "int32int16", "versions": "0+", "about": "The node's port error code, or 0 if there was no error." }, { "name": "RackErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null", "about": "The rack of the nodeerror message, or null if itthere haswas not been assigned to a rackno error." } ]} ] } |
...
DescribeShareGroupOffsets API
The AlterShareGroupOffsets DescribeShareGroupOffsets API is used to alter describe the share-partition start offsets for the share-partitions in a share group. The group coordinator serves this API.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "AlterShareGroupOffsetsRequestDescribeShareGroupOffsetsRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group identifier." }, { "name": "Topics", "type": "[]AlterShareGroupOffsetsRequestTopicDescribeShareGroupOffsetsRequestTopic", "versions": "0+", "about": "The topics to alterdescribe offsets for.", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true, "about": "The topic name." }, { "name": "Partitions", "type": "[]AlterShareGroupOffsetsRequestPartition", "versions": "0+", "about": "Each partition to alter offsets for.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition indexpartitions." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset." } ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "AlterShareGroupOffsetsResponseDescribeShareGroupOffsetsResponse", "validVersions": "0", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - GROUP_NOT_EMPTY (version 0+) // - KAFKA_STORAGE_ERROR (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_SERVER_ERROR (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Responses", "type": "[]AlterShareGroupOffsetsResponseTopicDescribeShareGroupOffsetsResponseTopic", "versions": "0+", "about": "The results for each topic.", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID." }, { "name": "Partitions", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versions": "The unique topic ID." }, 0+", "fields": [ { "name": "PartitionsPartitionIndex", "type": "[]AlterShareGroupOffsetsResponsePartitionint32", "versions": "0+", "fieldsabout": ["The partition index." }, { "name": "PartitionIndexStartOffset", "type": "int32int64", "versions": "0+", "about": "The share-partition start indexoffset." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null", "about": "The error message, or null if there was no error." } ]} ]} ] } |
...
InitializeShareGroupStateAPI
The DeleteShareGroupOffsets InitializeShareGroupState API is used by the group coordinator to delete initialize the offsets for the share-partitions in a share group. The group coordinator serves this APIpartition state. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "DeleteShareGroupOffsetsRequestInitializeShareGroupStateRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", " "about": "The group identifier." }, { "name": "Topics", "type": "[]DeleteShareGroupOffsetsRequestTopicInitializeStateData", "versions": "0+", "about": "The topicsdata tofor deletethe offsets for.", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." } ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", topics.", "fields": [ { "name": "DeleteShareGroupOffsetsResponseTopicId", "validVersionstype": "0uuid", "flexibleVersionsversions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+)"about": "The topic identifier." }, // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - GROUP_NOT_EMPTY (version 0+) // - KAFKA_STORAGE_ERROR (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_SERVER_ERROR (version 0+) "fields": [ { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "about": "The data for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ThrottleTimeMsStateEpoch", "type": "int32", "versions": "0+", "ignorable": true, "about": "The durationstate in milliseconds for whichepoch of the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, share-partition." }, { "name": "ResponsesStartOffset", "type": "[]DeleteShareGroupOffsetsResponseTopicint64", "versions": "0+", "about": "The results for each topic.", "fields": [ share-partition start offset, or -1 if the start offset is not being initialized." } ]} ]} ] } |
Response schema
Code Block |
---|
{ "name "apiKey": NN, "type": "TopicNameresponse", "typename": "stringInitializeShareGroupStateResponse", "versionsvalidVersions": "0+", "entityTypeflexibleVersions": "topicName",0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - FENCED_STATE_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "aboutfields": "The topic name." },[ { "name": "TopicIdResults", "type": "uuid[]InitializeStateResult", "versions": "0+", "ignorable": true, "about": "The uniqueinitialization topic ID." }results", "fields": [ { "name": "ErrorCodeTopicId", "type": "int16uuid", "versions": "0+", "about": "The error code, or 0 if there was no error.topic identifier" }, { "name": "ErrorMessagePartitions", "type": "string[]PartitionResult", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null", "about": "The errorresults message, or null if there was no error." } ]} ] } |
DescribeShareGroupOffsets API
The DescribeShareGroupOffsets API is used to describe the offsets for the share-partitions in a share group. The group coordinator serves this API.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "DescribeShareGroupOffsetsRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ for the partitions.", "fields": [ { "name": "GroupIdPartition", "type": "stringint32", "versions": "0+", "entityType": "groupId", "about": "The grouppartition identifierindex." }, { "name": "TopicsErrorCode", "type": "[]DescribeShareGroupOffsetsRequestTopicint16", "versions": "0+", "about": "The topics to describe offsets for.", "fields": [ error code, or 0 if there was no error." }, { "name": "TopicNameErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "entityTypedefault": "topicNamenull", "about": "The topic nameerror message, or null if there was no error." }, ]} ]} ] } |
ReadShareGroupState API
The ReadShareGroupState API is used by share-partition leaders to read share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{ "nameapiKey": "Partitions"NN, "type": "[]int32request", "versionslisteners": ["0+broker"], "name": "ReadShareGroupStateRequest", "validVersions": "0", "aboutflexibleVersions": "The partitions." }0+", "fields": [ ]} { "name": ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "GroupId", "type": "string", "versions": "0+", "about": "The group identifier." }, { "name": "DescribeShareGroupOffsetsResponseTopics", "validVersionstype": "0[]ReadStateData", "flexibleVersionsversions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_SERVER_ERROR (version 0+) "fields": [ "about": "The data for the topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, { "name": "ThrottleTimeMsPartitions", "type": "int32[]PartitionData", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota data for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ResponsesLeaderEpoch", "type": "[]DescribeShareGroupOffsetsResponseTopicint32", "versions": "0+", "about":, "The resultsleader epoch forof eachthe topicshare-partition.", "fields": [ } ]} ]} ] } |
Response schema
Code Block |
---|
{ "name "apiKey": NN, "type": "TopicNameresponse", "typename": "stringReadShareGroupStateResponse", "versionsvalidVersions": "0+", "entityTypeflexibleVersions": "topicName0+", // - NOT_COORDINATOR (version 0+) "about": "The topic name." }, // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - FENCED_LEADER_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "TopicIdResults", "type": "uuid[]ReadStateResult", "versions": "0+", "ignorable": true, "about": "The unique topic ID." }, { "name": "Partitions", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versions": "0+", "read results", "fields": [ { "name": "PartitionIndexTopicId", "type": "int32uuid", "versions": "0+", "about": "The partitiontopic index.identifier" }, { "name": "StartOffsetPartitions", "type": "int64[]PartitionResult", "versions": "0+", "about": "The results share-partitionfor startthe offsetpartitions."}, "fields": [ { "name": "ErrorCodePartition", "type": "int16int32", "versions": "0+", "about": "The error code, or 0 if there was no errorpartition index." }, { "name": "ErrorMessageErrorCode", "type": "stringint16", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null", "about": "The error message, or null if there was no error." } ]} ]} ] } |
InitializeShareGroupStateAPI
The InitializeShareGroupState API is used by the group coordinator to initialize the share-partition state. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "InitializeShareGroupStateRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." } { "name": "GroupIdStateEpoch", "type": "stringint32", "versions": "0+", "about": "The group identifier state epoch of the share-partition." }, { "name": "TopicsStartOffset", "type": "[]InitializeStateDataint64", "versions": "0+", "about": "The data for the topics.", "fields": [ share-partition start offset, which can be -1 if it is not yet initialized." }, { "name": "TopicIdStateBatches", "type": "uuid[]StateBatch", "versions": "0+", "aboutfields":[ "The topic identifier." }, { "name": "PartitionsFirstOffset", "type": "[]PartitionDataint64", "versions": "0+", "about": "The first offset dataof forthis thestate partitionsbatch." }, "fields": [ { "name": "PartitionLastOffset", "type": "int32int64", "versions": "0+", "about": "The partition index last offset of this state batch." }, { "name": "StateEpochDeliveryState", "type": "int32int8", "versions": "0+", "about": "The delivery state epoch of the share-partition- 0:Available,2:Acked,4:Archived." }, { "name": "StartOffsetDeliveryCount", "type": "int64int16", "versions": "0+", "about": "The share-partition start offset, or -1 if the start offset is not being initialized." delivery count." } ]} ]} ]} ] } |
...
WriteShareGroupState API
The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "responserequest", "name": "InitializeShareGroupStateResponse", "validVersions": "0", "flexibleVersions": "0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - FENCED_STATE_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "Results", "type": "[]InitializeStateResult", "versionslisteners": ["broker"], "name": "WriteShareGroupStateRequest", "validVersions": "0+", "aboutflexibleVersions": "The initialization results0+", "fields": [ { "name": "TopicIdGroupId", "type": "uuidstring", "versions": "0+", "about": "The topicgroup identifier." }, { "name": "PartitionsTopics", "type": "[]PartitionResultWriteStateData", "versions": "0+", "about" : "The resultsdata for the partitionstopics.", "fields": [ { "name": "PartitionTopicId", "type": "int32uuid", "versions": "0+", "about": "The partitiontopic indexidentifier." }, { "name": "ErrorCodePartitions", "type": "int16[]PartitionData", "versions": "0+", "about": "The errordata code,for or 0 if there was no error." },the partitions.", "fields": [ { "name": "ErrorMessagePartition", "type": "stringint32", "versions": "0+", "nullableVersionsabout": "0+", "default": "null"The partition index." }, { "aboutname": "StateEpoch"The error message, or null if there was no error." } ]} ]} ] } |
ReadShareGroupState API
The ReadShareGroupState API is used by share-partition leaders to read share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "ReadShareGroupStateRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ , "type": "int32", "versions": "0+", "about": "The state epoch of the share-partition." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The leader epoch of the share-partition." }, { "name": "GroupIdStartOffset", "type": "stringint64", "versions": "0+", "about": "The group identifier share-partition start offset, or -1 if the start offset is not being written." }, { "name": "TopicsStateBatches", "type": "[]ReadStateDataStateBatch", "versions": "0+", "aboutfields": "The data for the topics.", "fields": [ [ { "name": "TopicIdFirstOffset", "type": "uuidint64", "versions": "0+", "about": "The topic identifierfirst offset of this state batch." }, { "name": "PartitionsLastOffset", "type": "[]PartitionDataint64", "versions": "0+", "about": "The last offset dataof forthis thestate partitionsbatch." }, "fields": [ { "name": "PartitionDeliveryState", "type": "int32int8", "versions": "0+", "about": "The delivery state partition index.- 0:Available,2:Acked,4:Archived" }, { "name": "LeaderEpochDeliveryCount", "type": "int32int16", "versions": "0+", "about",: "The leader epoch of the share-partition." delivery count." } ]} ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "ReadShareGroupStateResponseWriteShareGroupStateResponse", "validVersions": "0", "flexibleVersions": "0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - FENCED_LEADER_EPOCH (version 0+) // - FENCED_STATE_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "Results", "type": "[]ReadStateResultWriteStateResult", "versions": "0+", "about": "The readwrite results", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier" }, { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+", "about" : "The results for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." } { "name": "StateEpoch", ]} ]} ] } |
DeleteShareGroupState API
The DeleteShareGroupState API is used by the group coordinator to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "int32request", "versionslisteners": ["0+broker"], "name": "DeleteShareGroupStateRequest", "validVersions": "0", "aboutflexibleVersions": "The state epoch of the share-partition." }0+", "fields": [ { "name": "StartOffsetGroupId", "type": "int64string", "versions": "0+", "about": "The share-partition start offset, which can be -1 if it is not yet initializedgroup identifier." }, { "name": "StateBatchesTopics", "type": "[]StateBatchDeleteStateData", "versions": "0+", "fieldsabout":[ "The data for the topics.", "fields": [ { "name": "FirstOffsetTopicId", "type": "int64uuid", "versions": "0+", "about": "The first offset of this state batchtopic identifier." }, { "name": "LastOffsetPartitions", "type": "int64[]PartitionData", "versions": "0+", "about": "The lastdata offsetfor of this state batchthe partitions." }, "fields": [ { "name": "DeliveryStatePartition", "type": "int8int32", "versions": "0+", "about": "The delivery state - 0:Available,2:Acked,4:Archivedpartition index." }, ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "DeliveryCountDeleteShareGroupStateResponse", "typevalidVersions": "int160", "versionsflexibleVersions": "0+", // - NOT_COORDINATOR (version 0+) // "about": "The delivery count." } ]} ]} ]} ] } |
WriteShareGroupState API
The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "WriteShareGroupStateRequest", "validVersions": "0", "flexibleVersions": "0+", - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - FENCED_STATE_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "Results", "type": "[]DeleteStateResult", "versions": "0+", "about": "The delete results", "fields": [ { "name": "GroupIdTopicId", "type": "stringuuid", "versions": "0+", "about": "The grouptopic identifier." }, { "name": "TopicsPartitions", "type": "[]WriteStateDataPartitionResult", "versions": "0+", "about": "The dataresults for the topicspartitions.", "fields": [ { "name": "TopicIdPartition", "type": "uuidint32", "versions": "0+", "about": "The topicpartition identifierindex." }, { "name": "PartitionsErrorCode", "type": "[]PartitionDataint16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." } ]} ]} ] } |
ReadShareGroupStateSummary API
The ReadShareGroupStateSummary API is used by the group coordinator to read a summary of the share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{ "apiKey": NN, data for the partitions.", "fields": [ { "name": "Partition", "type": "int32request", "versionslisteners": ["0+broker"], "aboutname": "The partition index." }ReadShareGroupStateSummaryRequest", { "name"validVersions": "StateEpoch0", "type": "int32", "versionsflexibleVersions": "0+", "about"fields": "The state epoch of the share-partition." }, [ { "name": "LeaderEpochGroupId", "type": "int32string", "versions": "0+", "about": "The leader epoch of the share-partitiongroup identifier." }, { "name": "StartOffsetTopics", "type": "int64[]ReadStateSummaryData", "versions": "0+", "about": "The share-partition start offset, or -1 if the start offset is not being written." }, { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+"data for the topics.", "fields": [ { "name": "FirstOffsetTopicId", "type": "int64uuid", "versions": "0+", "about": "The first offset of this state batchtopic identifier." }, { "name": "LastOffsetPartitions", "type": "int64[]PartitionData", "versions": "0+", "about": "The lastdata offsetfor of this state batchthe partitions." },, "fields": [ { "name": "DeliveryStatePartition", "type": "int8int32", "versions": "0+", "about": "The delivery state - 0:Available,2:Acked,4:Archivedpartition index." }, { "name": "DeliveryCountLeaderEpoch", "type": "int16int32", "versions": "0+", "about": "The deliveryleader count." } ]epoch of the share-partition." } ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "WriteShareGroupStateResponseReadShareGroupStateSummaryResponse", "validVersions": "0", "flexibleVersions": "0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - FENCED_LEADER_EPOCH (version 0+) // - FENCED_STATE_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "Results", "type": "[]WriteStateResultReadStateSummaryResult", "versions": "0+", "about": "The writeread results", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier" }, { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+", "about" : "The results for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." } { "name": "StateEpoch", "type": "int32", "versions": "0+", "about": "The state epoch of the share-partition." }, { "name": "ErrorMessageStartOffset", "type": "stringint64", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." share-partition start offset." } ]} ]} ]} ] } |
DeleteShareGroupState API
The DeleteShareGroupState API is used by the group coordinator to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.
...
]
} |
Records
This section describes the new record types.
Group and assignment metadata
In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets
topic.
With the exception of the ShareGroupStatePartitionMetadata record, all of the record types are analogous to those introduced in KIP-848 for consumer groups.
For each share group, a ShareGroupMetadata record is written for the group epoch, and a ShareGroupPartitionMetadata record is written for the topic-partitions being assigned by the group. When the group is deleted, a tombstone records are written.
For each member, there is a ShareGroupMemberMetadata record which enables group membership to continue seamlessly across a group coordinator change. When the member leaves, a tombstone record is written.
For each share group, a ShareGroupTargetAssignmentMetadata record is written to record the group epoch used to compute the assignment. For each member, there is a ShareGroupTargetAssignmentMember record which persists the target assignment, and a ShareGroupCurrentMemberAssignment record which persists the current assignment and is also used to keep track of the member epoch.
There is also a ShareGroupStatePartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.
ShareGroupMetadataKey
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"]data", "name": "DeleteShareGroupStateRequestShareGroupMetadataKey", "validVersions": "011", "flexibleVersions": "0+none", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "about":"The group identifier." }, { "name": "Topics", "type": "[]DeleteStateData", "versions": "0+", "about": "The data for the topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", 11", "about": "The topicgroup identifierid." }, ] } |
ShareGroupMetadataValue
Code Block |
---|
{ { "nametype": "Partitionsdata", "typename": "[]PartitionDataShareGroupMetadataValue", "versionsvalidVersions": "0+", "about"flexibleVersions": "The data for the partitions.0+", "fields": [ { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "about": "The partitiongroup indexepoch." } ]} ]} ] } |
...
ShareGroupPartitionMetadataKey
Code Block |
---|
{ "apiKey": NN, "type": "responsedata", "name": "DeleteShareGroupStateResponseShareGroupPartitionMetadataKey", "validVersions": "09", "flexibleVersions": "0+none", "fields": [ { "name": "GroupId", "type": "string", "versions": "9", "about": "The group id." } ] } |
ShareGroupPartitionMetadataValue
Code Block |
---|
{ "type": "data", "name": "ShareGroupPartitionMetadataValue", "validVersions": "0", "flexibleVersions": "0+",// - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - FENCED_STATE_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "ResultsTopics", "typeversions": "[]DeleteStateResult0+", "versionstype": "0+[]TopicMetadata", "about": "The list of deletetopic resultsmetadata.", "fields": [ { "name": "TopicId", "typeversions": "uuid0+", "versionstype": "0+uuid", "about": "The topic identifierid." }, { "name": "PartitionsTopicName", "typeversions": "[]PartitionResult0+", "versionstype": "0+string", "about" : "The results for the partitionstopic name.", "fields": [ }, { "name": "PartitionNumPartitions", "typeversions": "int320+", "versionstype": "0+int32", "about": "The partition index number of partitions of the topic." }, { "name": "ErrorCodePartitionMetadata", "typeversions": "int160+", "versionstype": "0+[]PartitionMetadata", "about": "The error code, or 0 if there was no error." },Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [ { "name": "Partition", "ErrorMessageversions": "0+", "type": "string", "versions"int32", "about": "The partition number." }, { "name": "0+Racks", "nullableVersionsversions": "0+", "defaulttype": "null[]string", "about": "The errorset message,of orracks nullthat ifthe therepartition wasis nomapped errorto." } ]} ]} ] } |
ReadShareGroupStateSummary API
The ReadShareGroupStateSummary API is used by the group coordinator to read a summary of the share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.
Request schema
ShareGroupMemberMetadataKey
Code Block |
---|
{ "apiKey": NN, "type": "requestdata", "listeners": ["broker"], "name": "ReadShareGroupStateSummaryRequestShareGroupMemberMetadataKey", "validVersions": "010", "flexibleVersions": "0+none", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+10", "about": "The group identifierid." }, { "name": "TopicsMemberId", "type": "[]ReadStateSummaryDatastring", "versions": "0+10", "about": "The member id." } ] } |
ShareGroupMemberMetadataValue
Code Block |
---|
{ "type": "data for the topics.",", "name": "ShareGroupMemberMetadataValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "TopicIdRackId", "typeversions": "uuid0+", "versionsnullableVersions": "0+", "type": "string", "about": "The (optional) topicrack identifierid." }, { "name": "PartitionsClientId", "typeversions": "[]PartitionData0+", "versionstype": "0+string", "about": "The data for the partitions.", "fieldsabout": [ "The client id." }, { "name": "PartitionClientHost", "typeversions": "int320+", "versionstype": "0+string", "about": "The partitionclient indexhost." }, { "name": "LeaderEpochSubscribedTopicNames", "typeversions": "int320+", "versionstype": "0+[]string", "about": "The leaderlist epochof ofsubscribed thetopic share-partitionnames." } ]} ]} ] } |
...
ShareGroupTargetAssignmentMetadataKey
Code Block |
---|
{ "apiKey": NN, "type": "responsedata", "name": "ReadShareGroupStateSummaryResponseShareGroupTargetAssignmentMetadataKey", "validVersions": "012", "flexibleVersions": "0+none", // - NOT_COORDINATOR (version 0+) "fields": [ // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - FENCED_LEADER_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "Results", "type": "[]ReadStateSummaryResult", "versions": "0+", "about": "The read results",{ "name": "GroupId", "type": "string", "versions": "12", "about": "The group id." } ] } |
ShareGroupTargerAssignmentMetadataValue
Code Block |
---|
{ "type": "data", "name": "ShareGroupTargetAssignmentMetadataValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "TopicIdAssignmentEpoch", "typeversions": "uuid0+", "versionstype": "0+int32", "about": "The topicassignment identifierepoch." }, ] } |
ShareGroupTargetAssignmentMemberKey
Code Block |
---|
{ "type { "name": "Partitionsdata", "typename": "[]PartitionResultShareGroupTargetAssignmentMemberKey", "versionsvalidVersions": "0+13", "aboutflexibleVersions" : "The results for the partitions.none", "fields": [ { "name": "PartitionGroupId", "type": "int32string", "versions": "0+13", "about": "The partitiongroup indexid." }, { "name": "ErrorCodeMemberId", "type": "int16string", "versions": "0+13", "about": "The error code, or 0 if there was no errormember id." }, { "name": "ErrorMessage",] } |
ShareGroupTargetAssignmentMemberValue
Code Block |
---|
{ "type": "stringdata", "versionsname": "0+ShareGroupTargetAssignmentMemberValue", "nullableVersionsvalidVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." } flexibleVersions": "0+", "fields": [ { "name": "StateEpochTopicPartitions", "typeversions": "int320+", "versionstype": "0+[]TopicPartition", "about": "The state epoch of the share-partitionassigned partitions." }, "fields": [ { "name": "StartOffsetTopicId", "typeversions": "int640+", "versionstype": "0+uuid" }, { "name": "Partitions", "aboutversions": "0+"The share-partition start offset." } ], "type": "[]int32" } ]} ] } |
Records
This section describes the new record types.
Group metadata
In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets
topic.
For each share group, a single ShareGroupMetadata record is written. When the group is deleted, a tombstone record is written.
For each member, there is a ShareGroupMemberMetadata record which enables group membership to continue seamlessly across a group coordinator change. When the member leaves, a tombstone record is written.
There is also a ShareGroupPartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.
...
} |
ShareGroupCurrentMemberAssignmentKey
Code Block |
---|
{ "type": "data", "name": "ShareGroupMetadataKeyShareGroupCurrentMemberAssignmentKey", "validVersions": "1114", "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": "1114", "about": "The group id." } ] } |
ShareGroupMetadataValue
Code Block |
---|
{ "type": "data", "name": "ShareGroupMetadataValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "EpochMemberId", "type": "int32string", "versions": "0+14", "about": "The groupmember epochid." } ] } |
...
ShareGroupCurrentMemberAssignmentValue
Code Block |
---|
{ "type": "data", "name": "ShareGroupMemberMetadataKeyShareGroupCurrentMemberAssignmentValue", "validVersions": "100", "flexibleVersions": "none0+", "fields": [ { "name": "GroupIdMemberEpoch", "typeversions": "string0+", "versionstype": "10int32", "about": "The group id current member epoch that is expected from the member in the heartbeat request." }, { "name": "MemberIdPreviousMemberEpoch", "typeversions": "string0+", "versionstype": "10int32", "about": "The member idIf the last epoch bump is lost before reaching the member, the member will retry with the previous epoch." }, ] } |
ShareGroupMemberMetadataValue
Code Block |
---|
{ "typename": "dataState", "nameversions": "ShareGroupMemberMetadataValue0+", "validVersionstype": "0int8", "flexibleVersionsabout": "0+", "fields": [The member state. See MemberState for the possible values." }, { "name": "RackIdAssignedPartitions", "versions": "0+", "nullableVersions": "0+", "type": "string[]TopicPartitions", "about": "The (optionalpartitions assigned to (or owned by) rackthis idmember." } ], "commonStructs": [ { "name": "ClientIdTopicPartitions", "versions": "0+", "typefields": "string",[ { "aboutname": "The client id." }, { "nameTopicId", "type": "ClientHostuuid", "versions": "0+", "type": "string", "about": "The clienttopic hostId." }, { "name": "SubscribedTopicNamesPartitions", "versionstype": "0+[]int32", "typeversions": "[]string0+", "about": "The partition list of subscribed topic names." Ids." } ]} ] } |
...
ShareGroupStatePartitionMetadataKey
Code Block |
---|
{ "type": "data", "name": "ShareGroupPartitionMetadataKeyShareGroupStatePartitionMetadataKey", "validVersions": "915", "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": "915", "about": "The group id." } ] } |
...
ShareGroupStatePartitionMetadataValue
Code Block |
---|
{ "type": "data", "name": "ShareGroupPartitionMetadataValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "InitializedTopics", "versions": "0+", "type": "[]TopicPartitionsInfo", "about": "The topics with initialized share-group state." }, { "name": "DeletingTopics", "versions": "0+", "type": "[]TopicInfo", "about": "The topics whose share-group state is being deleted." } ], "commonStructs": [ { "name": "TopicPartitionsInfo", "versions": "0+", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, { "name": "TopicName", "type": "string", "versions": "0+", "about": "The topic name." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions." } ]}, { "name": "TopicInfo", "versions": "0+", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, { "name": "TopicName", "type": "string", "versions": "0+", "about": "The topic name." } ]} ] } |
...
Code Block |
---|
{ "type": "data", "name": "ShareSnapshotKey", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": "0", "about": "The group id." }, { "name": "TopicId", "type": "uuid", "versions": "0", "about": "The topic id." }, { "name": "Partition", "type": "int32", "versions": "0", "about": "The partition index." } ] } |
...
Code Block |
---|
{ "type": "data", "name": "ShareUpdateKey", "validVersions": "1", "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": "01", "about": "The group id." }, { "name": "TopicId", "type": "uuid", "versions": "01", "about": "The topic id." }, { "name": "Partition", "type": "int32", "versions": "01", "about": "The partition index." } ] } |
...
Metric Name | Type | Group | Tags | Description | JMX Bean | ||||
---|---|---|---|---|---|---|---|---|---|
group-count | Gauge | group-coordinator-metrics |
| The total number of share groups managed by group coordinator. |
| ||||
rebalance (rebalance-rate and rebalance-count) | Meter | group-coordinator-metrics |
| The total number of share group rebalances count and rate. |
| num-partitions | Gauge | ||
group-coordinator-metrics |
| The number of share partitions managed by group coordinator. |
| group-count | Gauge | group-coordinator-metrics |
| The number of share groups in respective state. | kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={emptyEmpty|stableStable|deadDead} |
share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count) | Meter | share-group-metrics | The total number of offsets acknowledged for share groupsacknowledgement requests. |
| |||||
record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count) | Meter | share-group-metrics |
| The number of records acknowledged per acknowledgement type. |
| ||||
partition-load-time (partition-load-time-avg and partition-load-time-max) | Meter | share-group-metrics | The time taken to load the share partitions. |
| |||||
partition-load-time (partition-load-time-avg and partition-load-time-max) | Meter | share-coordinator-metrics | The time taken in milliseconds to load the share-group state from the share-group state partitions. |
| |||||
thread-idle-ratio (thread-idle-ratio-min and thread-idle-ratio-avg) | Meter | share-coordinator-metrics | The fraction of time the share coordinator thread is idle. |
| |||||
write (write-rate and write-total) | Meter | share-coordinator-metrics | The number of share-group state write calls per second. |
| |||||
write-latency (write-latency-avg and write-latency-totalmax) | Meter | share-coordinator-metrics | The time taken for a share-group state write call, including the time to write to the share-group state topic. |
| |||||
num-partitions | Gauge | share-coordinator-metrics | The number of partitions in the share-state topic. |
|
...