...
The following diagram illustrates how these pieces are wired together using the new Kafka protocol RPCs introduced by this KIP.
Relationship with consumer groups
...
This KIP introduces org.apache.kafka.coordinator.group.share.SimpleAssignor
. It It attempts to balance the number of consumers assigned to the subscribed partitions, and assumes that all consumers are equally capable of consuming records. When calculating a new target assignment, the assignor is aware of the current assignment (assuming the group coordinator hasn't changed) and can use this to influence the calculation of the new target assignment. The calculation proceeds as follows:
The assignor hashes the member IDs of the members and maps the partitions assigned to the members based on the hash. This gives approximately even balance.
If any partitions were not assigned any members by (1) and do not have members already assigned in the current assignment, members are assigned round-robin until each partition has at least one member assigned to it.
- If any partitions were assigned members by (1) and also have members in the current assignment assigned by (2), the members assigned by (2) are removed.
When the number of members is greater than or equal to the number of partitions for a subscribed topic and every partition was assigned at least one member, adding more members to the group simply assigns more members to the partitions. If however, any partitions were not assigned any members by hashing, adding more members changes the distribution and will revoke some partitions previously assigned. Because assignments in share groups are looser than they are in consumer groups, the group coordinator doesn't require the members to confirm they have revoked partitions before they can be assigned to other members. For a brief period while the rebalancing is occurring, some partitions may have more members consuming than the assignment requires, but this situation soon resolves as the members receive their updated assignments and stop fetching from the revoked partitions.
...
The share-partition leader must be aware of when the group coordinator is being used to alter the SPSO with a KafkaAdmin.alterShareGroupOffsets
request. This only occurs when the group is empty. As a result, when the set of share sessions transitions from 0 to 1, the share-partition leader uses the ReadShareGroupOffsetsState
RPC ReadShareGroupStateSummary
RPC to validate its state epoch (this request is much cheaper for the share coordinator to handle than ReadShareGroupState
). We know that there are no acquired records, so re-initializing the share-partition leader is non-disruptive. If the state epoch has changed, the share-partition leader issues a ReadShareGroupState
RPC to the share coordinator and uses the response to re-initialize.
...
Several components work together to create share groups. The group coordinator is responsible for assignment, membership and the state of the group. The share-partition leaders are responsible for delivery and acknowledgement. The share coordinator is responsible for share-group persistent state.
The following table explains how the administration operations on share groups work.
Operation | Involves | Notes |
---|---|---|
Create share group | Group coordinator | This occurs as a side-effect of the initial a) The group coordinator serves the b) For each share-partition being initialized, the group coordinator sends an c) The share coordinator serves the d) Back in the group coordinator, it writes a ShareGroupPartitionMetadata record on the |
Assign a share-partition | Group coordinator and optionally share coordinator | When a topic-partition is assigned to a member of a share group for the first time, the group coordinator sends an InitializeShareGroupState request to the share coordinator. The share coordinator writes a ShareSnapshot record to the __share_group_state topic and responds to the group coordinator. The group coordinator writes an updated ShareGroupPartitionMetadata record, and the share-partition is now able to be included in an assignment in the share group. |
List share groups | Group coordinator | |
Describe share group | Group coordinator | |
List share group offsets | Group coordinator and share coordinator | The admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupOffsetsState ReadShareGroupStateSummary request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required. |
Alter share group offsets | Group coordinator and share coordinator | Only empty share groups support this operation. The group coordinator bumps the group epoch, writes a ShareGroupMetadata, and sends an InitializeShareGroupState request to the share coordinator. The share coordinator writes a ShareSnapshot record with the new state epoch to the __share_group_state topic. If the partition was not previously initialized, the group coordinator writes an updated ShareGroupPartitionMetadata record. |
Delete share group offsets | Group coordinator and share coordinator | This is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets topic adding the topic to the DeletingTopics array and removing it from the InitializedTopics array. This enables an interrupted deletion to be completed. The group coordinator sends a DeleteShareGroupState request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes an updated ShareGroupPartitionMetadata record to the __consumer_offsets topic to complete the deletion of the topic from the share group. |
Delete share group | Group coordinator and share coordinator | Only empty share groups can be deleted. If the share group has any topics with initialized state, it writes a ShareGroupPartitionMetadata record to the |
...
ShareGroupHeartbeat
- for consumers to form and maintain share groupsShareGroupDescribe
- for describing share groupsShareFetch
- for fetching records from share-partition leadersShareAcknowledge
- for acknowledging delivery of records with share-partition leadersAlterShareGroupOffsets
- for altering the share-partition start offsets for the share-partitions in a share groupDeleteShareGroupOffsets
- for deleting the offsets for the share-partitions in a share groupDescribeShareGroupOffsets
- for describing the offsets for the share-partitions in a share groupInitializeShareGroupState
- for initializing share-partition state on a share-coordinatorReadShareGroupState
- for reading share-partition state from a share coordinatorWriteShareGroupState
- for writing share-partition state to a share coordinatorDeleteShareGroupState
- for deleting share-partition state from a share coordinatorReadShareGroupOffsetsState
ReadShareGroupStateSummary
- for reading a summary of the offsets from the share-partition state from a share coordinator
...
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "DeleteShareGroupStateResponse", "validVersions": "0", "flexibleVersions": "0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - FENCED_STATE_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "Results", "type": "[]DeleteStateResult", "versions": "0+", "about": "The delete results", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier" }, { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+", "about" : "The results for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." } ]} ]} ] } |
...
ReadShareGroupStateSummary API
The ReadShareGroupOffsetsState ReadShareGroupStateSummary API is used by the group coordinator to read a summary of the offset information from share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.
...
Code Block |
---|
{ "apiKey": NN, "type": "request", "listeners": ["broker"], "name": "ReadShareGroupOffsetsStateRequestReadShareGroupStateSummaryRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "about":"The group identifier." }, { "name": "Topics", "type": "[]ReadOffsetsStateDataReadStateSummaryData", "versions": "0+", "about": "The data for the topics.", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier." }, { "name": "Partitions", "type": "[]PartitionData", "versions": "0+", "about": "The data for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The leader epoch of the share-partition." } ]} ]} ] } |
...
Code Block |
---|
{ "apiKey": NN, "type": "response", "name": "ReadShareGroupOffsetsStateResponseReadShareGroupStateSummaryResponse", "validVersions": "0", "flexibleVersions": "0+", // - NOT_COORDINATOR (version 0+) // - COORDINATOR_NOT_AVAILABLE (version 0+) // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) // - GROUP_ID_NOT_FOUND (version 0+) // - UNKNOWN_TOPIC_OR_PARTITION (version 0+) // - FENCED_LEADER_EPOCH (version 0+) // - INVALID_REQUEST (version 0+) "fields": [ { "name": "Results", "type": "[]ReadOffsetsStateResultReadStateSummaryResult", "versions": "0+", "about": "The read results", "fields": [ { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic identifier" }, { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+", "about" : "The results for the partitions.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The error message, or null if there was no error." } { "name": "StateEpoch", "type": "int32", "versions": "0+", "about": "The state epoch of the share-partition." }, { "name": "StartOffset", "type": "int64", "versions": "0+", "about": "The share-partition start offset." } ]} ]} ] } |
...
Metric Name | Type | Group | Tags | Description | JMX Bean | |||||
---|---|---|---|---|---|---|---|---|---|---|
last-poll-seconds-ago | Gauge | share-consumer-metrics |
| The number of seconds since the last poll() invocation. |
| |||||
time-between-poll-avg | Meter | share-consumer-metrics |
| The average delay between invocations of poll() in milliseconds. |
| |||||
time-between-poll-max | Meter | share-consumer-metrics |
| The maximum delay between invocations of poll() in milliseconds. |
| |||||
poll-idle-ratio-avg | Meter | share-consumer-metrics |
| The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records. |
| |||||
heartbeat-response-time-max | Meter | share-consumer-coordinator-metrics |
| The maximum time taken to receive a response to a heartbeat request in milliseconds. |
| |||||
heartbeat-rate | Meter | share-consumer-coordinator-metrics |
| The number of heartbeats per second. |
| |||||
heartbeat-total | Meter | share-consumer-coordinator-metrics |
| The total number of heartbeats. |
| |||||
last-heartbeat-seconds-ago | Gauge | share-consumer-coordinator-metrics |
| The number of seconds since the last coordinator heartbeat was sent. | rebalance-latency-total | Meter | share-consumer-coordinator-metrics |
| The total number of milliseconds spent in rebalances. |
|
rebalance-latency-avg | Meter | share-consumer-coordinator-metrics |
| The average time taken for a group to complete a rebalance in milliseconds. |
| |||||
rebalance-latency-max | Meter | share-consumer-coordinator-metrics |
| The maximum time taken for a group to complete a rebalance in milliseconds. |
| |||||
| ||||||||||
rebalance-total | Meter | share-consumer-coordinator-metrics |
| The total number of rebalance events. |
| |||||
rebalance-rate-per-hour | Meter | share-consumer-coordinator-metrics |
| The number of rebalance events per hour. |
| |||||
fetch-size-avg | Meter | share-consumer-fetch-manager-metrics |
| The average number of bytes fetched per request. |
| |||||
fetch-size-max | Meter | share-consumer-fetch-manager-metrics |
| The maximum number of bytes fetched per request. |
| |||||
bytes-fetched-rate | Meter | share-consumer-fetch-manager-metrics |
| The average number of bytes fetched per second. |
| |||||
bytes-fetched-total | Meter | share-consumer-fetch-manager-metrics |
| The total number of bytes fetched. |
| |||||
records-per-request-avg | Meter | share-consumer-fetch-manager-metrics |
| The average number of records in each request. |
| |||||
records-per-request-max | Meter | share-consumer-fetch-manager-metrics |
| The maximum number of records in a request. |
| |||||
records-fetched-rate | Meter | share-consumer-fetch-manager-metrics |
| The average number of records fetched per second. |
| |||||
records-fetched-total | Meter | share-consumer-fetch-manager-metrics |
| The total number of records fetched. |
| |||||
acknowledgements-send-rate | Meter | share-consumer-fetch-manager-metrics |
| The average number of record acknowledgements sent per second. |
| |||||
acknowledgements-send-total | Meter | share-consumer-fetch-manager-metrics |
| The total number of record acknowledgements sent. |
| |||||
acknowledgements-error-rate | Meter | share-consumer-fetch-manager-metrics |
| The average number of record acknowledgements that resulted in errors per second. |
| |||||
acknowledgements-error-total | Meter | share-consumer-fetch-manager-metrics |
| The total number of record acknowledgements that resulted in errors. |
| |||||
fetch-latency-avg | Meter | share-consumer-fetch-manager-metrics |
| The average time taken for a fetch request. |
| |||||
fetch-latency-max | Meter | share-consumer-fetch-manager-metrics |
| The maximum time taken for any fetch request. |
| |||||
fetch-rate | Meter | share-consumer-fetch-manager-metrics |
| The number of fetch requests per second. |
| |||||
fetch-total | Meter | share-consumer-fetch-manager-metrics |
| The total number of fetch requests. |
| |||||
fetch-throttle-time-avg | Meter | share-consumer-fetch-manager-metrics |
| The average throttle time in milliseconds. |
| |||||
fetch-throttle-time-max | Meter | share-consumer-fetch-manager-metrics |
| The maximum throttle time in milliseconds. |
|
...