...
Whenever the group epoch is larger than the target assignment epoch, the group coordinator triggers the computation of a new target assignment based on the latest group metadata using a server-side assignor. For a share group, the group coordinator does not persist the assignment. The assignment epoch becomes the group epoch of the group metadata used to compute the assignment.
...
For a share group, the group coordinator persists three seven kinds of records:
- ShareGroupMetadata - this is written to reserve the group's ID as a share group in the namespace of groups.
- ShareGroupMemberMetadata - this is written to allow group membership to continue seamlessly across a group coordinator change.
- ShareGroupPartitionMetadata - this is written to persist the metadata about the partitions the group is assigning to the members.
- ShareGroupTargetAssignmentMetadata - this is written to persist the assignment epoch.
- ShareGroupTargetAssignmentMember - this is written to persist the target assignment for a member.
- ShareGroupCurrentMemberAssignment - this is written to persist the current assignment for a member, and also to maintain the sequence of member epochs.
- ShareGroupStatePartitionMetadata - this is written whenever the set of topic-partitions being consumed in the share group changes. Its purpose is to keep track of which topic-partitions will have share-group persistent state.
When the group coordinator fails over, the newly elected coordinator replays the state from the __consumer_offsets
partition. This means a share group will remain in existence across the fail-over, along with the list of members . However, the assignments are not persisted and will be recalculated by the new group coordinatorand their assignments. It will bump the group epoch as a result.
...
The group coordinator is responsible for asking the share coordinator to initialize and delete the durable share-partition state. The group coordinator uses the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to keep track of which share-partitions have been initialized.
...
- When a topic is added to the set of subscribed topics for a share group and is not yet in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record
- When partitions are added to a topic which is already in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record
- When
KafkaAdmin.alterShareGroupOffsets
is used to reset the SPSO for a share-partition
The state is deleted in the following cases:
- When a topic in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record is deleted
- When
KafkaAdmin.deleteShareGroupOffsets
is used to delete state for a share-partition, most likely as a result of an administrator cleaning up a topic which is no longer in use by this share group - When the share group is deleted
The group coordinator periodically reconciles its "hard" state in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata with the "soft" state in the cluster metadata. This is how it observes relevant changes to topics, such as topic deletion. The ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata contains the set of topic-partitions which are known to be initialized.
...
- Sends the
InitializeShareGroupState
RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover. - When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic added.
...
- Sends the
InitializeShareGroupState
RPC to the share coordinators for all of the new partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover. - When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the array of partitions for the topic increased.
...
- If the topic still exists, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic added to the
DeletingTopics
array. This enables group coordinator failover to continue the deletion. If the topic does not exist, the requirement to continue deletion can be inferred by the non-existence of the topic. - Sends the
DeleteShareGroupState
RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover. - When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic removed.
...
Operation | Involves | Notes |
---|---|---|
Create share group | Group coordinator | This occurs as a side-effect of the initial a) The group coordinator serves the b) For each share-partition being initialized, the group coordinator sends an c) The share coordinator serves the d) Back in the group coordinator, it writes a ShareGroupPartitionMetadata ShareGroupPartitionStateMetadata record on the |
Assign a share-partition | Group coordinator and optionally share coordinator | When a topic-partition is eligible to be assigned to a member of a share group for the first time, the group coordinator sends an InitializeShareGroupState request to the share coordinator. The share coordinator writes a ShareSnapshot record to the __share_group_state topic and responds to the group coordinator. The group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record, and the share-partition is now able to be included in an assignment in the share group. |
List share groups | Group coordinator | |
Describe share group | Group coordinator | |
List share group offsets | Group coordinator and share coordinator | The admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupStateSummary request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required. |
Alter share group offsets | Group coordinator and share coordinator | Only empty share groups support this operation. The group coordinator bumps the group epoch, writes a ShareGroupMetadata, and sends an InitializeShareGroupState request to the share coordinator. The share coordinator writes a ShareSnapshot record with the new state epoch to the __share_group_state topic. If the partition was not previously initialized, the group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record. |
Delete share group offsets | Group coordinator and share coordinator | This is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the __consumer_offsets topic adding the topic to the DeletingTopics array and removing it from the InitializedTopics array. This enables an interrupted deletion to be completed. The group coordinator sends a DeleteShareGroupState request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the __consumer_offsets topic to complete the deletion of the topic from the share group. |
Delete share group | Group coordinator and share coordinator | Only empty share groups can be deleted. If the share group has any topics with initialized state, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the |
...
Option | Description |
---|---|
--all-topics | Consider all topics assigned to a group in the `reset-offsets` process. |
--bootstrap-server <String: server to connect to> | REQUIRED: The server(s) to connect to. |
--command-config <String: command config property file> | Property file containing configs to be passed to Admin Client. |
--delete | Pass in groups to delete topic partition offsets over the entire Delete share group. For instance --group g1 --group g2 |
--delete-offsets | Delete offsets of share group. Supports one share group at the time, and multiple topics. |
--describe | Describe share group, members and list offset lag (number of records not yet processed) related to given groupoffset information. |
--dry-run | Only show results without executing changes on share groups. Supported operations: reset-offsets. |
--execute | Execute operation. Supported operations: reset-offsets. |
--group <String: share group> | The share group we wish to act on. |
--help | Print usage information. |
--list | List all share groups. |
--members | Describe members of the group. This option may be used with the '--describe' option only. |
--offsets | Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only. |
--reset-offsets | Reset offsets of share group. Supports one share group at a time, and instances must be inactive. If neither Has 2 execution options, '--dry-run' nor (the default) and '–execute' is specified,. Has 3 reset options: '--to-earliest', '--to-latest' and '--to-datetime'. |
--state [String] | When specified with '--describe', includes the state of the group. When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. The valid values are 'Empty', 'Stable' and 'Dead'. |
--timeout <Long: timeout (ms)> | The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000) |
--to-datetime <String: datetime> | Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'. |
--to-earliest | Reset offsets to earliest offset. |
--to-latest | Reset offsets to latest offset. |
--topic <String: topic> | The topic whose share group information should be deleted or included in the reset offset process. In `reset-offsets` caseWhen resetting offsets, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partitions to be included. |
--version | Display Kafka version. |
...
Configuration | Description | Values |
---|---|---|
group.coordinator.rebalance.protocols | The list of enabled rebalance protocols. (Existing configuration) |
This will be added to the default value of this configuration property once this feature is complete. |
group.share.delivery.count.limit | The maximum number of delivery attempts for a record delivered to a share group. | Default 5, minimum 2, maximum 10 |
group.share.record.lock.duration.ms | Share-group record acquisition lock duration in milliseconds. | Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds) |
group.share.min.record.lock.duration.ms | Share-group record acquisition lock minimum duration in milliseconds. | Default 15000 (15 seconds), minimum 1000 (1 second), maximum 30000 (30 seconds) |
group.share.max.record.lock.duration.ms | Share-group record acquisition lock maximum duration in milliseconds. | Default 60000 (60 seconds), minimum 30000 (30 seconds), maximum 3600000 (1 hour) |
group.share.partition.max.record.locks | Share-group record lock limit per share-partition. | Default 200, minimum 100, maximum 10000 |
group.share.session.timeout.ms | The timeout to detect client failures when using the group protocol. | Default 45000 (45 seconds) |
group.share.min.session.timeout.ms | The minimum session timeout. | Default 45000 (45 seconds) |
group.share.max.session.timeout.ms | The maximum session timeout. | Default 60000 (60 seconds) |
group.share.heartbeat.interval.ms | The heartbeat interval given to the members. | Default 5000 (5 seconds) |
group.share.min.heartbeat.interval.ms | The minimum heartbeat interval. | Default 5000 (5 seconds) |
group.share.max.heartbeat.interval.ms | The maximum heartbeat interval. | Default 15000 (15 seconds) |
group.share.max.groups | The maximum number of share groups. | Default 10, minimum 1, maximum 100 |
group.share.max.size | The maximum number of consumers that a single share group can accommodate. | Default 200, minimum 10, maximum 1000 |
group.share.assignors | The server-side assignors as a list of full class names. The list must contain only a single entry which is used by all groups. In the future, it is envisaged that a group configuration will be provided to allow each group to choose one of the list of assignors. | A list of class names, currently limited to a single entry. Default "org.apache.kafka.coordinator.group.share.SimpleAssignor" |
share.coordinator.state.topic.num.partitions | The number of partitions for the share-group state topic (should not change after deployment). | Default 50 |
share.coordinator.state.topic.replication.factor | The replication factor for the share-group state topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement. | Default 3 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use) |
share.coordinator.state.topic.segment.bytes | The log segment size for the share-group state topic. | Default 104857600 |
share.coordinator.state.topic.min.isr | Overridden min.insync.replicas for the share-group state topic. | Default 2 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use) |
share.coordinator.threads .state.topic.compression.codec | Compression codec for the share-group state topic. | Default 0 (NONE) |
share.coordinator.append.linger.ms | The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk. | Default 10, minimum 0 |
share.coordinator.load.buffer.size | Batch size for reading from the share-group state topic when loading state information into the cache (soft-limit, overridden if records are too large). | Default 5 * 1024 * 1024 (5242880), minimum 1 |
share.coordinator.snapshot.update.records.per.snapshot | The number of update records the share coordinator writes between snapshot records. | Default 500, minimum 0 |
share.coordinator.threads | The number of The number of threads used by the share coordinator. | Default 1, minimum 1 |
share.coordinator.write.timeout.ms | The duration in milliseconds that the share coordinator will wait for all replicas of the share-group state topic to receive a write. | Default 5000, minimum 1 |
share.fetch.purgatory.purge.interval.requests | The purge interval (in number of requests) of the share fetch request purgatory | Default 1000 |
Group configuration
The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.
...
FindCoordinator
- for finding coordinators, to support share coordinatorsListGroups
- for listing groups, to support listing share groups
Access control
This table gives the ACLs required for the new APIs.
...
Version 6 is the same as version 5.
ListGroups API
This KIP introduces version 6
Request schema
ShareGroupHeartbeat API
The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.
Request schema
The member must set all the top-level fields when it joins for the first time or when an error occurs. Otherwise, it is expected only to fill in the fields which have changed since the last heartbeatVersion 6 adds support for share groups.
Code Block |
---|
{ "apiKey": 1676, "type": "request", "listeners": ["zkBroker", "broker"], "name": "ListGroupsRequest", // Version 1 and 2 are the same as version 0. // // Version 3 is the first flexible version. // // Version 4 adds the StatesFilter field (KIP-518). // // Version 5 adds the TypesFilter field (KIP-848). // // Version 6 adds support for share groups. ShareGroupHeartbeatRequest", "validVersions": "0-6", "flexibleVersions": "30+", "fields": [ { "name": "StatesFilterGroupId", "type": "[]string", "versions": "4+0+", "entityType": "groupId", "about": "The statesgroup of the groups we want to list. If empty, all groups are returned with their state." }identifier." }, { "name": "TypesFilterMemberId", "type": "[]string", "versions": "50+", "about": "The typesmember ofID thegenerated groupsby we want to listthe coordinator. IfThe empty,member allID groupsmust arebe returnedkept withduring theirthe type." } ] } |
Response schema
Version 6 is the same as version 5.
ShareGroupHeartbeat API
The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.
Request schema
The member must set all the top-level fields when it joins for the first time or when an error occurs. Otherwise, it is expected only to fill in the fields which have changed since the last heartbeat.
Code Block |
---|
{ "apiKey": 76, "type": "request", "listeners": ["broker"], "name": "ShareGroupHeartbeatRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group identifier." }, { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member ID generated by the coordinator. The member ID must be kept during the entire lifetime of the member." }, { "name": "MemberEpoch", "type": "int32", "versions": "0+", "about": "The current member epoch; 0 to entire lifetime of the member." }, { "name": "MemberEpoch", "type": "int32", "versions": "0+", "about": "The current member epoch; 0 to join the group; -1 to leave the group." }, { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." }, { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." } ] } |
Response schema
The group coordinator will only send the Assignment field when it changes.
...
The ShareGroupDescribe API is used to describe share groups.
Request schema
Code Block |
---|
{ "apiKey": 77, "type": "request", "listeners": ["broker"], "name": "ShareGroupDescribeRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId", "about": "The ids of the groups to describe" }, { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+", "about": "Whether to include authorized operations." } ] } |
Response schema
Code Block |
---|
{ "apiKey": 77, "type": "response", "name": "ShareGroupDescribeResponse", "validVersions": "0", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - INVALID_REQUEST (version 0+) // - INVALID_GROUP_ID (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+", "about": "Each described group.", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The describe error, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The top-level error message, or null if there was no error." }, { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group ID string." }, { "name": "GroupState", "type": "string", "versions": "0+", "about": "The group state string, or the empty string." }, { "name": "GroupEpoch", "type": "int32", "versions": "0+", "about": "The group epoch." }, { "name": "AssignmentEpoch", "type": "int32", "versions": "0+", "about": "The assignment epoch." }, { "name": "AssignorName", "type": "string", "versions": "0+", "about": "The selected assignor." }, { "name": "Members", "type": "[]Member", "versions": "0+", "about": "The members.", "fields": [ { "name": "MemberId", "type": "string", "versions": "0+", "about": "The member ID." }, { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The member rack ID." }, { "name": "MemberEpoch", "type": "int32", "versions": "0+", "about": "The current member epoch." }, { "name": "ClientId", "type": "string", "versions": "0+", "about": "The client ID." }, { "name": "ClientHost", "type": "string", "versions": "0+", "about": "The client host." }, { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName", "about": "The subscribed topic names." }, { "name": "Assignment", "type": "Assignment", "versions": "0+", "about": "The current assignment." } ]}, { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648", "about": "32-bit bitfield to represent authorized operations for this group." } ] } ], "commonStructs": [ { "name": "TopicPartitions", "versions": "0+", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic ID." }, { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions." } ]}, { "name": "Assignment", "versions": "0+", "fields": [ { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "about": "The assigned topic-partitions to the member." } ]} ] } |
...
- If acknowledgements are being made for a partition and no records should be fetched,
PartitionMaxBytes
should be set to zero. - If acknowledgements are being made for a partition which is being removed from the share session, the partition is included in the
Topics
array withPartitionMaxBytes
set to zero AND the partition is included inForgottenTopicsData
. - If acknowledgements are being made for a partition in the final request in a share session, the partition is included in the
Topics
array andShareSessionEpoch
is set to -1. No data will be fetched and it is not necessary to include the partition inForgottenTopicsData
. - If there's an error which affects all piggybacked acknowledgements but which does not prevent data from being fetched, the
AcknowledgeErrorCode
in the response will be set to the same value for all partitions which had piggybacked acknowledgements.
Request schema
For the AcknowledgementBatches
of each topic-partition, the FirstOffsets
must be ascending order and the ranges must be non-overlapping.
...
Code Block |
---|
{ "apiKey": 78, "type": "request", "listeners": ["broker"], "name": "ShareFetchRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId", "about": "The group identifier." }, { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The member ID." }, { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+", "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." }, { "name": "MaxWaitMs", "type": "int32", "versions": "0+", "about": "The maximum time in milliseconds to wait for the response." }, { "name": "MinBytes", "type": "int32", "versions": "0+", "about": "The minimum bytes to accumulate in the response." }, { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true, "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." }, { "name": "Topics", "type": "[]FetchTopic", "versions": "0+", "about": "The topics to fetch.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+", "about": "The partitions to fetch.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+", "about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." }, { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+", "about": "Record batches to acknowledge.", "fields": [ { "name": "FirstOffset", "type": "int64", "versions": "0+", "about": "First offset of batch of records to acknowledge."}, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "Last offset (inclusive) of batch of records to acknowledge."}, { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+", "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."} ]} ]} ]}, { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "ignorable": false, "about": "The partitions to remove from this share session.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions indexes to forget." } ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": 78, "type": "response", "name": "ShareFetchResponse", "validVersions": "0", "flexibleVersions": "0+", // Supported errors for ErrorCode and AcknowledgeErrorCode: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - TOPIC_AUTHORIZATION_FAILED (version 0+) // - SHARE_SESSION_NOT_FOUND (version 0+) // - INVALID_SHARE_SESSION_EPOCH (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - NOT_LEADER_OR_FOLLOWER (version 0+) // - UNKNOWN_TOPIC_ID (version 0+) // - INVALID_RECORD_STATE (version 0+) - only for AcknowledgeErrorCode // - KAFKA_STORAGE_ERROR (version 0+) // - CORRUPT_MESSAGE (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_SERVER_ERROR (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true, "about": "The top-level response error code." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The top-level error message, or null if there was no error." }, { "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+", "about": "The response topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "about": "The topic partitions.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The fetch error code, or 0 if there was no fetch error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The fetch error message, or null if there was no fetch error." }, { "name": "AcknowledgeErrorCode", "type": "int16", "versions": "0+", "about": "The acknowledge error code, or 0 if there was no acknowledge error." }, { "name": "AcknowledgeErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The acknowledge error message, or null if there was no acknowledge error." }, { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fields": [ { "name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the current leader or -1 if the leader is unknown." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch." } ]}, { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."}, { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields": [ {"name": "FirstOffset", "type": "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."}, {"name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records."}, {"name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count of this batch of acquired records."} ]} ]} ]}, { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+", "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [ { "name": "NodeId", "type": "int32", "versions": "0+", "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." }, { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname." }, { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port." }, { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The rack of the node, or null if it has not been assigned to a rack." } ]} ] } |
...
The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.
Request schema
For the AcknowledgementBatches
of each topic-partition, the FirstOffsets
must be ascending order and the ranges must be non-overlapping.
...
Code Block |
---|
{ "apiKey": 79, "type": "request", "listeners": ["broker"], "name": "ShareAcknowledgeRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId", "about": "The group identifier." }, { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The member ID." }, { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+", "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." }, { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+", "about": "The topics containing records to acknowledge.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+", "about": "The partitions containing records to acknowledge.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+", "about": "Record batches to acknowledge.", "fields": [ { "name": "FirstOffset", "type": "int64", "versions": "0+", "about": "First offset of batch of records to acknowledge."}, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "Last offset (inclusive) of batch of records to acknowledge."}, { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+", "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."} ]} ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": 79, "type": "response", "name": "ShareAcknowledgeResponse", "validVersions": "0", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - TOPIC_AUTHORIZATION_FAILED (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - SHARE_SESSION_NOT_FOUND (version 0+) // - INVALID_SHARE_SESSION_EPOCH (version 0+) // - NOT_LEADER_OR_FOLLOWER (version 0+) // - UNKNOWN_TOPIC_ID (version 0+) // - INVALID_RECORD_STATE (version 0+) // - KAFKA_STORAGE_ERROR (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_SERVER_ERROR (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true, "about": "The top level response error code." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The top-level error message, or null if there was no error." }, { "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse", "versions": "0+", "about": "The response topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "about": "The topic partitions.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." }, { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fields": [ { "name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the current leader or -1 if the leader is unknown." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch." } ]} ]} ]}, { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+", "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [ { "name": "NodeId", "type": "int32", "versions": "0+", "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." }, { "name": "Host", "type": "string", "versions": "0+", "about": "The node's hostname." }, { "name": "Port", "type": "int32", "versions": "0+", "about": "The node's port." }, { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The rack of the node, or null if it has not been assigned to a rack." } ]} ] } |
...
The AlterShareGroupOffsets API is used to alter the share-partition start offsets for the share-partitions in a share group. The group coordinator serves this API.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "AlterShareGroupOffsetsRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group identifier." }, { "name": "Topics", "type": "[]AlterShareGroupOffsetsRequestTopic", "versions": "0+", "about": "The topics to alter offsets for.", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true, "about": "The topic name." }, { "name": "Partitions", "type": "[]AlterShareGroupOffsetsRequestPartition", "versions": "0+", "about": "Each partition to alter offsets for.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset." } ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "AlterShareGroupOffsetsResponse", "validVersions": "0", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - GROUP_NOT_EMPTY (version 0+) // - KAFKA_STORAGE_ERROR (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_SERVER_ERROR (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Responses", "type": "[]AlterShareGroupOffsetsResponseTopic", "versions": "0+", "about": "The results for each topic.", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID." }, { "name": "Partitions", "type": "[]AlterShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null", "about": "The error message, or null if there was no error." } ]} ]} ] } |
...
The DeleteShareGroupOffsets API is used to delete the offsets for the share-partitions in a share group. The group coordinator serves this API.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "DeleteShareGroupOffsetsRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group identifier." }, { "name": "Topics", "type": "[]DeleteShareGroupOffsetsRequestTopic", "versions": "0+", "about": "The topics to delete offsets for.", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." } ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "DeleteShareGroupOffsetsResponse", "validVersions": "0", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - GROUP_NOT_EMPTY (version 0+) // - KAFKA_STORAGE_ERROR (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_SERVER_ERROR (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Responses", "type": "[]DeleteShareGroupOffsetsResponseTopic", "versions": "0+", "about": "The results for each topic.", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID." } { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null", "about": "The error message, or null if there was no error." } ]} ] } |
...
The DescribeShareGroupOffsets API is used to describe the offsets for the share-partitions in a share group. The group coordinator serves this API.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "DescribeShareGroupOffsetsRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The group identifier." }, { "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic", "versions": "0+", "about": "The topics to describe offsets for.", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions." } ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "DescribeShareGroupOffsetsResponse", "validVersions": "0", "flexibleVersions": "0+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - INVALID_REQUEST (version 0+) // - UNKNOWN_SERVER_ERROR (version 0+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Responses", "type": "[]DescribeShareGroupOffsetsResponseTopic", "versions": "0+", "about": "The results for each topic.", "fields": [ { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID." }, { "name": "Partitions", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset."}, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null", "about": "The error message, or null if there was no error." } ]} ]} ] } |
...
The InitializeShareGroupState API is used by the group coordinator to initialize the share-partition state. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "InitializeShareGroupStateRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "about": "The group identifier." }, { "name": "Topics", "type": "[]InitializeStateData", "versions": "0+", "about": "The data for the topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "about": "The data for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "StateEpoch", "type": "int32", "versions": "0+", "about": "The state epoch of the share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset, or -1 if the start offset is not being initialized." } ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "InitializeShareGroupStateResponse", "validVersions": "0", "flexibleVersions": "0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - FENCED_STATE_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "Results", "type": "[]InitializeStateResult", "versions": "0+", "about": "The initialization results", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier" }, { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+", "about": "The results for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." } ]} ]} ] } |
...
The ReadShareGroupState API is used by share-partition leaders to read share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "ReadShareGroupStateRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "about": "The group identifier." }, { "name": "Topics", "type": "[]ReadStateData", "versions": "0+", "about": "The data for the topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "about": "The data for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about", "The leader epoch of the share-partition." } ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "ReadShareGroupStateResponse", "validVersions": "0", "flexibleVersions": "0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - FENCED_LEADER_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "Results", "type": "[]ReadStateResult", "versions": "0+", "about": "The read results", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier" }, { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+", "about": "The results for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." } { "name": "StateEpoch", "type": "int32", "versions": "0+", "about": "The state epoch of the share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset, which can be -1 if it is not yet initialized." }, { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields":[ { "name": "FirstOffset", "type": "int64", "versions": "0+", "about": "The first offset of this state batch." }, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this state batch." }, { "name": "DeliveryState", "type": "int8", "versions": "0+", "about": "The delivery state - 0:Available,2:Acked,4:Archived." }, { "name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count." } ]} ]} ]} ] } |
...
The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "WriteShareGroupStateRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "about": "The group identifier." }, { "name": "Topics", "type": "[]WriteStateData", "versions": "0+", "about": "The data for the topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "about": "The data for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "StateEpoch", "type": "int32", "versions": "0+", "about": "The state epoch of the share-partition." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The leader epoch of the share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset, or -1 if the start offset is not being written." }, { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields": [ { "name": "FirstOffset", "type": "int64", "versions": "0+", "about": "The first offset of this state batch." }, { "name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this state batch." }, { "name": "DeliveryState", "type": "int8", "versions": "0+", "about": "The delivery state - 0:Available,2:Acked,4:Archived" }, { "name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count." } ]} ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "WriteShareGroupStateResponse", "validVersions": "0", "flexibleVersions": "0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - FENCED_LEADER_EPOCH (version 0+) // - FENCED_STATE_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "Results", "type": "[]WriteStateResult", "versions": "0+", "about": "The write results", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier" }, { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+", "about": "The results for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." } ]} ]} ] } |
...
The DeleteShareGroupState API is used by the group coordinator to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "DeleteShareGroupStateRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "about": "The group identifier." }, { "name": "Topics", "type": "[]DeleteStateData", "versions": "0+", "about": "The data for the topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "about": "The data for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." } ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "DeleteShareGroupStateResponse", "validVersions": "0", "flexibleVersions": "0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - FENCED_STATE_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "Results", "type": "[]DeleteStateResult", "versions": "0+", "about": "The delete results", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier" }, { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+", "about": "The results for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." } ]} ]} ] } |
...
The ReadShareGroupStateSummary API is used by the group coordinator to read a summary of the share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.
Request schema
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "ReadShareGroupStateSummaryRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "about": "The group identifier." }, { "name": "Topics", "type": "[]ReadStateSummaryData", "versions": "0+", "about": "The data for the topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "about": "The data for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The leader epoch of the share-partition." } ]} ]} ] } |
Response schema
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "ReadShareGroupStateSummaryResponse", "validVersions": "0", "flexibleVersions": "0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - FENCED_LEADER_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "Results", "type": "[]ReadStateSummaryResult", "versions": "0+", "about": "The read results", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier" }, { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+", "about": "The results for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." } { "name": "StateEpoch", "type": "int32", "versions": "0+", "about": "The state epoch of the share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset." } ]} ]} ] } |
...
This section describes the new record types.
Group and assignment metadata
In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets
topic.
With the exception of the ShareGroupStatePartitionMetadata record, all of the record types are analogous to those introduced in KIP-848 for consumer groups.
For each share group, a single ShareGroupMetadata record record is written . When for the group is deletedepoch, and a tombstone ShareGroupPartitionMetadata record is written for the topic-partitions being assigned by the group. When the group is deleted, a tombstone records are written.
For each member, there is a ShareGroupMemberMetadata record which enables group membership to continue seamlessly across a group coordinator change. When the member leaves, a tombstone record is written.
For each share group, a ShareGroupTargetAssignmentMetadata record is written to record the group epoch used to compute the assignment. For each member, there is a ShareGroupTargetAssignmentMember record which persists the target assignment, and a ShareGroupCurrentMemberAssignment record which persists the current assignment and is also used to keep track of the member epoch.
There is also a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.
...
partitions have persistent state.
ShareGroupMetadataKey
Code Block |
---|
{
"type": "data",
"name": "ShareGroupMetadataKey",
"validVersions": "11",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "11",
"about": "The group id." }
]
} |
ShareGroupMetadataValue
Code Block |
---|
{
"type": "data",
"name": "ShareGroupMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Epoch", "type": "int32", "versions": "0+",
"about": "The group epoch." }
]
} |
ShareGroupPartitionMetadataKey
Code Block |
---|
{
"type": "data",
"name": "ShareGroupPartitionMetadataKey",
"validVersions": "9",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "9",
"about": "The group id." }
]
} |
ShareGroupPartitionMetadataValue
Code Block |
---|
{
"type": "data",
"name": "ShareGroupPartitionMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Topics", "versions": "0+", "type": "[]TopicMetadata",
"about": "The list of topic metadata.", "fields": [
{ "name": "TopicId", "versions": "0+", "type": "uuid",
"about": "The topic id." },
{ "name": "TopicName", "versions": "0+", "type": "string",
"about": "The topic name." },
{ "name": "NumPartitions", "versions": "0+", "type": "int32",
"about": "The number of partitions of the topic." },
{ "name": "PartitionMetadata", "versions": "0+", "type": "[]PartitionMetadata",
"about": "Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [
{ "name": "Partition", "versions": "0+", "type": "int32",
"about": "The partition number." },
{ "name": "Racks", "versions": "0+", "type": "[]string",
"about": "The set of racks that the partition is mapped to." }
]}
]}
]
} |
ShareGroupMemberMetadataKey
Code Block |
---|
{
"type": "data",
"name": "ShareGroupMemberMetadataKey",
"validVersions": "10",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "10",
"about": "The group id." },
{ "name": "MemberId", "type": "string", "versions": "10",
"about": "The member id." }
]
} |
ShareGroupMemberMetadataValue
Code Block |
---|
{
"type": "data",
"name": "ShareGroupMemberMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "RackId", "versions": "0+", "nullableVersions": "0+", "type": "string",
"about": "The (optional) rack id." },
{ "name": "ClientId", "versions": "0+", "type": "string",
"about": "The client id." },
{ "name": "ClientHost", "versions": "0+", "type": "string",
"about": "The client host." },
{ "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string",
"about": "The list of subscribed topic names." }
]
} |
ShareGroupTargetAssignmentMetadataKey
Code Block |
---|
{
"type": "data",
"name": "ShareGroupTargetAssignmentMetadataKey",
"validVersions": "12",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "12",
"about": "The group id." }
]
} |
ShareGroupTargerAssignmentMetadataValue
Code Block |
---|
{
"type": "data",
"name": "ShareGroupTargetAssignmentMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "AssignmentEpoch", "versions": "0+", "type": "int32",
"about": "The assignment epoch." }
]
} |
ShareGroupTargetAssignmentMemberKey
Code Block |
---|
{
"type": "data",
"name": "ShareGroupTargetAssignmentMemberKey",
"validVersions": "13",
"flexibleVersions": "none",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "13",
"about": "The group id." },
{ "name": "MemberId", "type": "string", "versions": "13",
"about": "The member id." }
]
} |
ShareGroupTargetAssignmentMemberValue
Code Block |
---|
{
"type": "data",
"name": "ShareGroupTargetAssignmentMemberValue",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "TopicPartitions", "versions": "0+", "type": "[]TopicPartition",
"about": "The assigned partitions.", "fields": [
{ "name": "TopicId", "versions": "0+", "type": "uuid" },
{ "name": "Partitions", "versions": "0+", "type": "[]int32" }
]}
]
} |
ShareGroupCurrentMemberAssignmentKey
Code Block |
---|
{ "type": "data", "name": "ShareGroupMetadataKeyShareGroupCurrentMemberAssignmentKey", "validVersions": "1114", "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": "1114", "about": "The group id." } ] } |
ShareGroupMetadataValue
Code Block |
---|
{ "type": "data", "name": "ShareGroupMetadataValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "EpochMemberId", "type": "int32string", "versions": "0+14", "about": "The groupmember epochid." } ] } |
...
ShareGroupCurrentMemberAssignmentValue
Code Block |
---|
{ "type": "data", "name": "ShareGroupMemberMetadataKeyShareGroupCurrentMemberAssignmentValue", "validVersions": "100", "flexibleVersions": "none0+", "fields": [ { "name": "GroupIdMemberEpoch", "typeversions": "string0+", "versionstype": "10int32", "about": "The group id current member epoch that is expected from the member in the heartbeat request." }, { "name": "MemberIdPreviousMemberEpoch", "typeversions": "string0+", "versionstype": "10int32", "about": "The member idIf the last epoch bump is lost before reaching the member, the member will retry with the previous epoch." }, ] } |
ShareGroupMemberMetadataValue
Code Block |
---|
{ "typename": "dataState", "nameversions": "ShareGroupMemberMetadataValue0+", "validVersionstype": "0int8", "flexibleVersionsabout": "0+", The "fields": [ { "name": "RackId", "versionsmember state. See MemberState for the possible values." }, { "name": "0+AssignedPartitions", "nullableVersionsversions": "0+", "type": "string[]TopicPartitions", "about": "The partitions (optionalassigned to (or owned by) rackthis idmember." } ], "commonStructs": [ { "name": "ClientIdTopicPartitions", "versions": "0+", "typefields": "string",[ { "aboutname": "The client id." }, { "nameTopicId", "type": "ClientHostuuid", "versions": "0+", "type": "string", "about": "The clienttopic hostId." }, { "name": "SubscribedTopicNamesPartitions", "versionstype": "0+[]int32", "typeversions": "[]string0+", "about": "The partition list of subscribed topic names." Ids." } ]} ] } |
...
ShareGroupStatePartitionMetadataKey
Code Block |
---|
{ "type": "data", "name": "ShareGroupPartitionMetadataKeyShareGroupStatePartitionMetadataKey", "validVersions": "915", "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": "915", "about": "The group id." } ] } |
...
ShareGroupStatePartitionMetadataValue
Code Block |
---|
{ "type": "data", "name": "ShareGroupPartitionMetadataValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "InitializedTopics", "versions": "0+", "type": "[]TopicPartitionsInfo", "about": "The topics with initialized share-group state." }, { "name": "DeletingTopics", "versions": "0+", "type": "[]TopicInfo", "about": "The topics whose share-group state is being deleted." } ], "commonStructs": [ { "name": "TopicPartitionsInfo", "versions": "0+", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, { "name": "TopicName", "type": "string", "versions": "0+", "about": "The topic name." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "The partitions." } ]}, { "name": "TopicInfo", "versions": "0+", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, { "name": "TopicName", "type": "string", "versions": "0+", "about": "The topic name." } ]} ] } |
...
Code Block |
---|
{ "type": "data", "name": "ShareUpdateKey", "validVersions": "1", "flexibleVersions": "none", "fields": [ { "name": "GroupId", "type": "string", "versions": "01", "about": "The group id." }, { "name": "TopicId", "type": "uuid", "versions": "01", "about": "The topic id." }, { "name": "Partition", "type": "int32", "versions": "01", "about": "The partition index." } ] } |
...
Metric Name | Type | Group | Tags | Description | JMX Bean | ||||
---|---|---|---|---|---|---|---|---|---|
group-count | Gauge | group-coordinator-metrics |
| The total number of share groups managed by group coordinator. |
| ||||
rebalance (rebalance-rate and rebalance-count) | Meter | group-coordinator-metrics |
| The total number of share group rebalances count and rate. |
| num-partitions | Gauge | ||
group-coordinator-metrics |
| The number of share partitions managed by group coordinator. |
| group-count | Gauge | group-coordinator-metrics |
| The number of share groups in respective state. | kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={emptyEmpty|stableStable|deadDead} |
share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count) | Meter | share-group-metrics | The total number of offsets acknowledged for share groupsacknowledgement requests. |
| |||||
record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count) | Meter | share-group-metrics |
| The number of records acknowledged per acknowledgement type. |
| ||||
partition-load-time (partition-load-time-avg and partition-load-time-max) | Meter | share-group-metrics | The time taken to load the share partitions. |
| |||||
partition-load-time (partition-load-time-avg and partition-load-time-max) | Meter | share-coordinator-metrics | The time taken in milliseconds to load the share-group state from the share-group state partitions. |
| |||||
thread-idle-ratio (thread-idle-ratio-min and thread-idle-ratio-avg) | Meter | share-coordinator-metrics | The fraction of time the share coordinator thread is idle. |
| |||||
write (write-rate and write-total) | Meter | share-coordinator-metrics | The number of share-group state write calls per second. |
| |||||
write-latency (write-latency-avg and write-latency-totalmax) | Meter | share-coordinator-metrics | The time taken for a share-group state write call, including the time to write to the share-group state topic. |
| |||||
num-partitions | Gauge | share-coordinator-metrics | The number of partitions in the share-state topic. |
|
...