Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added share.fetch.purgatory.purge.interval.requests

Table of Contents

Status

Current stateVotingAccepted

Discussion thread: https://lists.apache.org/thread/9wdxthfsbm5xf01y4xvq6qtlg0gq96lq

...

  • The consumers in a share group cooperatively consume records with partitions that may be assigned to multiple consumers

  • The number of consumers in a share group can exceed the number of partitions in a topic

  • Records are acknowledged on an individual basis, although the system is optimized to work in batches for improved efficiency

  • Delivery attempts to consumers in a share group are counted to enable automated handling of unprocessable records

...

The cluster limits the number of records acquired for consumers for each topic-partition in a share group. Once the limit is reached, fetching records will temporarily yield no further records until the number of acquired records reduces, as naturally happens when the locks time out. This limit is controlled by the broker configuration property group.share.recordpartition.lockmax.partitionrecord.limitlocks . By limiting the duration of the acquisition lock and automatically releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures.

...

  • It maintains the list of share-group members.

  • It manages the topic-partition assignments for the share-group members using a server-side partition assignor. This KIP defines just one assignor, org.apache.kafka.coordinator.group.assignorshare.SimpleShareAssignorSimpleAssignor, as described below.

A share-partition is a topic-partition with a subscription in a share group. For a topic-partition subscribed in more than one share group, each share group has its own share-partition.

...

The following diagram illustrates how these pieces are wired together using the new Kafka protocol RPCs introduced by this KIP.

Image RemovedImage Added

Relationship with consumer groups

Consumer groups and share groups exist in the same namespace in a Kafka cluster. As a result, if there’s a consumer group with a particular name, you cannot create a share group with the same name, and vice versa. But consumer groups and share groups are quite different in terms of use, so attempts to perform operations for one kind of group on a group of the incorrect type will fail with a GroupIdNotFoundException . The new AdminClient .listGroups  method gives a way of listing groups of all types.

Because consumer groups and share groups are both created automatically on first use, the type of group that is created depends upon how the group ID was first used. As a result, it is helpful to be able to ensure that a group of a particular name can only be created with a particular type. This can be achieved by defining a group configuration property group.type , using the kafka-configs.sh  tool or the AdminClient.incrementalAlterConfigs  method. For example, you could use the following command to ensure that the group ID "G1" is to be used for a share group only.

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-name group --entity-name G1 --alter --add-config group.type=share

If a regular Kafka consumer then attempts to use "G1" as a consumer group and the group G1  does not exist or is not a consumer group, the exception InconsistentGroupProtocolException will be thrown.

Note that the group.type  configuration is applied when a new group is created. It is best used to ensure that a group ID is reserved for use with a particular type of group. Changing the configuration when a group already exists will not have an effect until the existing group has been deleted. For production use, choosing a naming convention in advance and using this configuration to enforce the group type is recommended.

Share group membership

This KIP builds upon the new consumer group protocol in KIP-848: The Next Generation of the Consumer Rebalance Protocol.

For production use, choosing a naming convention for groups in advance and using this configuration to enforce the group type is recommended.

A future KIP is planned for the administration of groups, encompassing aspects such as administrative creation of groups, and listing groups of all types including consumer groups, share groups and others such as groups used by Kafka Connect.

Share group membership

This KIP builds upon the new consumer group protocol in KIP-848: The Next Generation of the Consumer Rebalance Protocol.

Share group membership is controlled by the group coordinator. Consumers in a share group use the heartbeat mechanism to join, leave and confirm continued membership of the share group, using the new ShareGroupHeartbeat RPC. Share-partition assignment is also piggybacked on the heartbeat mechanism. Share groups only support server-side assignors, which implements the Share group membership is controlled by the group coordinator. Consumers in a share group use the heartbeat mechanism to join, leave and confirm continued membership of the share group, using the new ShareGroupHeartbeat RPC. Share-partition assignment is also piggybacked on the heartbeat mechanism. Share groups only support server-side assignors, which implement the new internal org.apache.kafka.coordinator.group.assignor.SharePartitionAssignorShareGroupPartitionAssignor  interface. There is just one implementation of this interface so far, org.apache.kafka.coordinator.group.assignorshare.SimpleShareAssignorSimpleAssignor .

For a share group, a rebalance is a much less significant event than for a consumer group because there’s no fencing. When a partition is assigned to a member of a share group, it’s telling the member that it should fetch records from that partition, which it may well be sharing with the other members of the share group. The members are not aware of each other, and there’s no synchronization barrier or fencing involved. The group coordinator, using the server-side assignor, is responsible for telling the members which partitions they are assigned and revoked. But the aim is to give every member useful work, rather than to keep the members' assignments safely separated.

...

  • A member joins or leaves the group.
  • A member updates its subscriptions.
  • A member is removed from the group by the group coordinator.
  • The partition metadata is updated, such as when a new partition is added or a topic is created or deleted, or when the rack ID changes.
  • AdminClient.alterShareGroupOffsets  is used to set the SPSO.

In all these casesIn all these cases, a new version of the group metadata is calculated by the group coordinator with an incremented group epoch. The new version of the group metadata signals that a new assignment is required for the group.

...

Whenever the group epoch is larger than the target assignment epoch, the group coordinator triggers the computation of a new target assignment based on the latest group metadata using a server-side assignor. For a share group, the group coordinator does not persist the assignment. The assignment epoch becomes the group epoch of the group metadata used to compute the assignment.

...

By assigning and revoking partitions for the members of the group, the group coordinator can balance the partitions across the members of the group.

The member provides the rebalance timeout to the group coordinator when it joins the group. This is the timeout for the group coordinator waiting for the member to acknowledge that it has adopted the target assignment. If the member does not confirm the target assignment within the rebalance timeout, the group coordinator removes the member from the group.

Member ID

Every member is uniquely identified by the UUID called the member ID. This UUID is generated by the group coordinator and given to the member when it joins the group. It is used in all communication with the group coordinator and must be kept during the entire lifespan of the member.

...

Share groups do not have the ASSIGNING state because only server-side assignors are supported, and do not need the RECONCILING state because there’s no need for all members to converge before the group enters the STABLE state. There is no automatic expiration of share groups.

  • EMPTY - When a share group is created or the last member leaves the group, the share group is EMPTY.
  • STABLE - When a share group has active members, the share group is STABLE.
  • DEAD - When the share group remains EMPTY for a configured period, the group coordinator transitions it to DEAD to delete it. This only happens if the group does not have any persistent share-group state. Share groups are intentionally more durable than consumer groups.

...

For a share group, the group coordinator persists three seven kinds of records:

  • ConsumerGroupMetadata ShareGroupMetadata - this is written to reserve the group's ID as a share group in the namespace of groups.
  • ShareGroupMemberMetadata - this is written to allow group membership to continue seamlessly across a group coordinator change.
  • ShareGroupPartitionMetadata - this is written to persist the metadata about the partitions the group is assigning to the members.
  • ShareGroupTargetAssignmentMetadata - this is written to persist the assignment epoch.
  • ShareGroupTargetAssignmentMember - this is written to persist the target assignment for a member.
  • ShareGroupCurrentMemberAssignment - this is written to persist the current assignment for a member, and also to maintain the sequence of member epochs.
  • ShareGroupStatePartitionMetadata - this is written whenever the set of topic-partitions being consumed in the share group changes. Its purpose is to keep track of which topic-partitions will have share-group persistent state.

When the group coordinator fails over, the newly elected coordinator loads replays the state from the __consumer_offsets  partition. This means a share group will remain in existence across the fail-over, along with the list of members . However, the assignments are not persisted and will be recalculated by the new group coordinatorand their assignments. It will bump the group epoch as a result.

The

...

SimpleAssignor

This KIP introduces org.apache.kafka.coordinator.group.assignorshare.SimpleShareAssignorSimpleAssignor .  It It attempts to balance the number of consumers assigned to the subscribed partitions, and assumes that all consumers are equally capable of consuming records. When calculating a new target assignment, the assignor is aware of the current assignment (assuming the group coordinator hasn't changed) and can use this to influence the calculation of the new target assignment. The calculation proceeds as follows:

  1. The assignor hashes the member IDs of the members and maps the partitions assigned to the members based on the hash. This gives approximately even balance.

  2. If any partitions

...

  1. were not assigned any members

...

  1. by (1) and do not have members already assigned in the current assignment, members are assigned round-robin until each partition has at least one member assigned to it.

  2. If any partitions were assigned members by (1) and also have members in the current assignment assigned by (2), the members assigned by (2) are removed.

When the number of members is greater than or equal to the number of partitions for a subscribed topic and every partition was assigned at least one member, adding more members to the group simply assigns more members to the partitions. If however, any partitions were not assigned any members by hashing, adding more members changes the distribution and will revoke some partitions previously assigned. Because assignments in share groups are looser than they are in consumer groups, the group coordinator doesn't require the members to confirm they have revoked partitions before they can be assigned to other members. For a brief period while the rebalancing is occurring, some partitions may have more members consuming than the assignment requires, but this situation soon resolves as the members receive their updated assignments and stop fetching from the revoked partitions.

...

State

Members subscribed to T1

Topic:partitions

Assignment change

Assignments

M1 subscribed to topic T1 with 1 partition

M1 (hash 1)

T1:0

Assign T1:0 to M1

M1 → T1:0

M2 joins the group and subscribes to T1

M1 (hash 1), M2 (hash 2)

T1:0

Assign T1:0 to M2

M1 → T1:0

M2 → T1:0

Add 3 partitions to T1

M1 (hash 1), M2 (hash 2)

T1:0, T1:1, T1:2, T1:3

Assign T1:1 and T1:3 to M2

Revoke T1:0 from M2

Assign T1:2 to M1

M1 → T1:0, T1:2

M2 → T1:1, T1:3

M3 joins the group and subscribes to T1

M1 (hash 1), M2 (hash 2), M3 (hash 3)

T1:0, T1:1, T1:2, T1:3

Assign T1:2 to M3

Revoke T1:2 from M1

M1 → T1:0

M2 → T1:1, T1:3

M3 → T1:2

M4 joins the group and subscribes to T1

M1 (hash 1), M2 (hash 2), M3 (hash 3), M4 (hash 4)

T1:0, T1:1, T1:2, T1:3

Assign T1:3 to M4

Revoke T1:3 from M2

M1 → T1:0

M2 → T1:1

M3 → T1:2

M4 → T1:3

M5, M6, M7, M8 join the group and subscribe to T1

M1 (hash 1), M2 (hash 2), M3 (hash 3), M4 (hash 4), M5 (hash 5), M6 (hash 6), M7 (hash 7), M8 (hash 8)

T1:0, T1:1, T1:2, T1:3

Assign T1:0 to M5

Assign T1:1 to M6

Assign T1:2 to M7

Assign T1:3 to M8

M1 → T1:0

M2 → T1:1

M3 → T1:2

M4 → T1:3

M5 → T1:0

M6 → T1:1

M7 → T1:2

M8 → T1:3

M1 subscribes to T2 with 2 partitions

M1 (hash 1), M2 (hash 2), M3 (hash 3), M4 (hash 4), M5 (hash 5), M6 (hash 6), M7 (hash 7), M8 (hash 8)

T1:0, T1:1, T1:2, T1:3

T2:0, T2:1

Assign T2:0 and T2:1 to M1

M1 → T1:0, T2:0, T2:1

M2 → T1:1

M3 → T1:2

M4 → T1:3

M5 → T1:0

M6 → T1:1

M7 → T1:2

M8 → T1:3

All members leave except M2

M2 (hash 2)

T1:0, T1:1, T1:2, T1:3

Assign T1:0, T1:2, T1:3 to M2

M2 → T1:0, T1:1, T1:2, T1:3

SimpleShareAssignor SimpleAssignor does not make assignments based on rack IDs. SimpleShareAssignor SimpleAssignor does not make assignments based on lag or consumer throughput.

...

The SPEO is not necessarily always at the end of the topic-partition and it just advances freely as records are fetched beyond this point. The segment of the topic-partition between the SPSO and the SPEO is a sliding window that moves as records are consumed. The share-partition leader limits the distance between the SPSO and the SPEO. The upper bound is controlled by the broker configuration group.share.recordpartition.lockmax.partitionrecord.limitlocks. Unlike existing queuing systems, there’s no “maximum queue depth”, but there is a limit to the number of in-flight records at any point in time.

...

For the read_uncommitted isolation level, which is the default, the share group consumes all non-transactional and transactional records. The SPEO is bounded by the high-water mark.

For the read_committed isolation level, the share group only consumes non-transactional records and committed transactional records. The set of records which are eligible to become in-flight records are non-transactional records and committed transactional records only. The SPSO SPEO cannot move past the last stable offset, so an open transaction blocks the progress of the share group with read_committed isolation level. The share-partition leader itself is responsible for filtering out transactional records which have been aborted. This can be done looking at the headers of the record batches without having to iterate of the individual records.

The records are fetched from the replica manager using FetchIsolation.TXN_COMMITTED . This means that the replica manager only returns records up to the last stable offset, meaning that any transactional records fetched were part of transactions which have already been completed. The replica manager also returns a list of aborted transactions relevant to the records which are fetched, and the share-partition leader uses this to filter out records for transactions which aborted.

...

Share sessions use memory on the share-partition leader. Each broker that is a share-partition leader has a cache of share sessions. Because a share session is an integral part of how share groups work, as opposed to a performance optimisation in the manner of fetch sessions, the limit is calculated based on by multiplying the share group configurations group.share.max.groups  and group.share.max.size . Sessions are evicted if they have been inactive for more than 120,000 milliseconds (2 minutes). The key of the cache is the pair (GroupId, MemberId).

...

In order to retain similarity with KafkaConsumer  and make it easy for applications to move between the two interfaces, KafkaShareConsumer  follows the same threading rules as KafkaConsumer . It is not thread-safe and only one thread at a time may called the call the methods of KafkaShareConsumer . Unsynchronized access will result in ConcurrentModificationException . The only exception to this rule is KafkaShareConsumer.wakeup()  which may be called from any thread.

...

There are also some internal APIs which are used by the share-group persister to communicate with the share coordinator. These are inter-broker RPCs and they are authorized as cluster actions.

A table containing the ACLs for the new RPCs can be found in the section on changes to the Kafka protocol below.

Managing durable share-partition state

...

The share coordinator will prefer to write a snapshot over an update (for example, when the SPSO moves and there are no in-flight records, the snapshot will be small and there’s no need to write an update instead). The share coordinator will take a snapshot periodically, frequently enough to minimise the number of ShareUpdate records to replay but rarely enough to minimise the performance cost of taking snapshots.

There are two kinds of fencing for share-group state.

The records also include a state epoch. This is used to ensure that all of the components involved are aligned on the current state, and to fence any calls to write to an old version of the state. Whenever the share-group state is initialized, the state epoch is set to the share group's current group epoch. This gives a very simple way to make sure that reads and writes refer to the current version of the state.

The records have the following content (note that the version number is used to differentiate between the record types, just as for the consumer-offsets topic):also include a leader epoch. Whenever the share-partition leader calls the share coordinator, it provides the leader epoch of the partition in the request. The share coordinator uses this to fence zombie share-partition leaders. When a new leader is elected for a share-partition, the leader epoch of the partition is incremented. This means that the new leader will use a higher leader epoch in its requests to the share coordinator, and any trailing requests from earlier share-partition leaders can be rejected with FENCED_LEADER_EPOCH . The leader epoch is persisted by the share coordinator in its ShareSnapshot and ShareUpdate records. When a new state epoch is used, the leader epoch is initialized to -1 , and it is properly initialized when a share-partition leader makes a request.

The records have the following content (note that the version number is used to differentiate between the record types, just as for the consumer-offsets topic):

Type

Type

Key

Value

ShareSnapshot


Code Block
Version: 0
GroupId: string
TopicId: uuid
Partition: int32



Code Block
StateEpoch: uint32
SnapshotEpoch: uint32
StartOffset: int64
States[]:
  
BaseOffset
FirstOffset: int64
  LastOffset: int64
  DeliveryState: int8
  DeliveryCount: int16


ShareUpdate


Code Block
Version: 1
GroupId: string
TopicId: uuid
Partition: int32



Code Block
StateEpoch: uint32
SnapshotEpoch: uint32
StartOffset: int64 (can be -1 if not updated)
States[]:
  
BaseOffset
FirstOffset: int64
  LastOffset: int64
  DeliveryState: int8
  DeliveryCount: int16


When a share coordinator is elected, it replays the records on its partition of the share-group state topic. It starts at the earliest offset, reading all of the records until the end of the partition. For each share-partition it finds, it needs to replay the latest ShareSnapshot and any subsequent ShareUpdate records whose snapshot epoch matches. Until replay is complete for a share-partition, the RPCs for mutating the share-state return the COORDINATOR_LOAD_IN_PROGRESS error code.

...

The share-group state data is not very amenable to log compaction. As a result, the share coordinator uses unlimited log retention and prunes the log records itself using ReplicaManager.deleteRecords. The share coordinator can delete all records before the latest ShareSnapshot for all active share-partitions. By taking periodic snapshots, the latest ShareSnapshot is replaced. For idle share-partitions, the share coordinator will periodically write a new ShareSnapshot so the older records can be pruned.

Using the share coordinator from the group coordinator

When a share-partition leader receives its first ShareFetch request for a topic-partition, it needs to initialize its The group coordinator is responsible for asking the share coordinator to initialize and delete the durable share-partition state. It finds the share coordinator using the FindCoordinator RPC using (key: "groupId:topicId:partition", key_type: SHARE ). Then, it sends the ReadShareGroupState RPC to the share coordinator. If the share coordinator has no share-partition state to return, it returns the UNKNOWN_TOPIC_OR_PARTITION error code indicating that this share-partition is not actually part of the share group. Otherwise, it returns the state to the share-partition leader which uses it to initialize and begin fetching records for the consumers. The SPSO returned might be -1 indicating that the initial SPSO needs to be set based on the group.share.auto.offset.reset configuration.

The share-partition leader must be aware of when the group coordinator is being used to alter the SPSO with a KafkaAdmin.alterShareGroupOffsets request. This only occurs when the group is empty. As a result, when the set of share sessions transitions from 0 to 1, the share-partition leader uses the ReadShareGroupOffsetsState RPC to validate its state epoch (this request is much cheaper for the share coordinator to handle than ReadShareGroupState ). We know that there are no acquired records, so re-initializing the share-partition leader is non-disruptive. If the state epoch has changed, the share-partition leader issues a ReadShareGroupState RPC to the share coordinator and uses the response to re-initialize.  

When a share-partition leader needs to update the durable share-partition state because of an acknowledgement or other state changed (such as a lock timeout), it sends the WriteShareGroupState RPC to the share coordinator. The share coordinator keeps track of the accumulated state of the share-partition and chooses how to record it to the share-state topic. Once it has successfully written to the topic and replication has completed, the RPC response is sent.

When a share partition is removed from a share group, perhaps because the topic is deleted or the administrator deletes the share-partition offsets (see KafkaAdmin.deleteShareGroupOffsets), the share-partition leader sends the DeleteShareGroupState RPC to the share coordinator. The share coordinator writes a final ShareSnapshot record with the special snapshot epoch value of -1. This acts as a deletion marker so the recovery processing of the share-group state topic sees the share-partition has been removed. There is no need to retain the deletion marker. Its only purpose is to make sure that a share coordinator reading records for a share-partition before it was removed, notices that those records apply to a defunct share-partition.

Examples with share-group state

Here are some examples showing the writing of share-group state.

The group coordinator uses the ShareGroupStatePartitionMetadata record to keep track of which share-partitions have been initialized.

The state is initialized in the following cases:

  • When a topic is added to the set of subscribed topics for a share group and is not yet in the ShareGroupStatePartitionMetadata record
  • When partitions are added to a topic which is already in the ShareGroupStatePartitionMetadata record
  • When KafkaAdmin.alterShareGroupOffsets  is used to reset the SPSO for a share-partition

The state is deleted in the following cases:

  • When a topic in the ShareGroupStatePartitionMetadata record is deleted
  • When KafkaAdmin.deleteShareGroupOffsets  is used to delete state for a share-partition, most likely as a result of an administrator cleaning up a topic which is no longer in use by this share group
  • When the share group is deleted

The group coordinator periodically reconciles its "hard" state in the ShareGroupStatePartitionMetadata with the "soft" state in the cluster metadata. This is how it observes relevant changes to topics, such as topic deletion. The ShareGroupStatePartitionMetadata contains the set of topic-partitions which are known to be initialized.

In order to add a topic, the group coordinator:

  • Sends the InitializeShareGroupState  RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupStatePartitionMetadata record with the topic added.

In order to add partitions to a known topic, the group coordinator:

  • Sends the InitializeShareGroupState  RPC to the share coordinators for all of the new partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupStatePartitionMetadata record with the array of partitions for the topic increased.

In order to remove a topic, the group coordinator:

  • If the topic still exists, it writes a ShareGroupStatePartitionMetadata record with the topic added to the DeletingTopics  array. This enables group coordinator failover to continue the deletion. If the topic does not exist, the requirement to continue deletion can be inferred by the non-existence of the topic.
  • Sends the DeleteShareGroupState  RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupStatePartitionMetadata record with the topic removed.

Using the share coordinator from the share-partition leader

When a share-partition leader receives its first ShareFetch request for a topic-partition, it needs to initialize its share-partition state. It finds the share coordinator using the FindCoordinator RPC using (key: "groupId:topicId:partition", key_type: SHARE ). Then, it sends the ReadShareGroupState RPC to the share coordinator. If the share coordinator has no share-partition state to return, it returns the UNKNOWN_TOPIC_OR_PARTITION error code indicating that this share-partition is not actually part of the share group. Otherwise, it returns the state to the share-partition leader which uses it to initialize and begin fetching records for the consumers. The SPSO returned might be -1 indicating that the initial SPSO needs to be set based on the group.share.auto.offset.reset configuration.

The share-partition leader must be aware of when the group coordinator is being used to alter the SPSO with a KafkaAdmin.alterShareGroupOffsets request. This only occurs when the group is empty. As a result, when the set of share sessions transitions from 0 to 1, the share-partition leader uses the ReadShareGroupStateSummary RPC to validate its state epoch (this request is much cheaper for the share coordinator to handle than ReadShareGroupState ). We know that there are no acquired records, so re-initializing the share-partition leader is non-disruptive. If the state epoch has changed, the share-partition leader issues a ReadShareGroupState RPC to the share coordinator and uses the response to re-initialize.  

When a share-partition leader needs to update the durable share-partition state because of an acknowledgement or other state changed (such as a lock timeout), it sends the WriteShareGroupState RPC to the share coordinator. The share coordinator keeps track of the accumulated state of the share-partition and chooses how to record it to the share-state topic. Once it has successfully written to the topic and replication has completed, the RPC response is sent.

When a share partition is removed from a share group, perhaps because the topic is deleted or the administrator deletes the share-partition offsets (see KafkaAdmin.deleteShareGroupOffsets), the share-partition leader sends the DeleteShareGroupState RPC to the share coordinator. The share coordinator writes a final ShareSnapshot record with the special snapshot epoch value of -1. This acts as a deletion marker so the recovery processing of the share-group state topic sees the share-partition has been removed. There is no need to retain the deletion marker. Its only purpose is to make sure that a share coordinator reading records for a share-partition before it was removed, notices that those records apply to a defunct share-partition.

Examples with share-group state

Here are some examples showing the writing of share-group state.

Operation

State changes

Cumulative state

WriteShareGroupState request

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 100,
  "StateBatches": []
}


In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards


Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)


Acknowledge 100-109

SPSO=110

SPSO=110, SPEO=110

Operation

State changes

Cumulative state

WriteShareGroupState request

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100

Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 100,
  "StateBatches": []
}

In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards

Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)

Acknowledge 100-109

SPSO=110

SPSO=110, SPEO=110


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 110,
  "StateBatches": []
}


With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records


Fetch records 110-119

Consumer 1 get 110-112, consumer 2 gets 113-118, consumer 3 gets 119

SPEO=120, records 110-119 (acquired, delivery count 1)

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)


Release 110 (consumer 1)

record 110 (available, delivery count 1)

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffsetFirstOffset": 110,
      "LastOffset": 110,
      "DeliveryState": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}


Acknowledge 119 (consumer 3)

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffsetFirstOffset": 119,
      "LastOffset": 119,
      "DeliveryState": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 110, 120 (consumer 1)

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Lock timeout elapsed 111, 112 (consumer 1's records)

records 111-112 (available, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffsetFirstOffset": 111,
      "LastOffset": 112,
      "DeliveryState": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}


Acknowledge 113-118 (consumer 2)

records 113-118 acknowledged

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffsetFirstOffset": 113,
      "LastOffset": 118,
      "DeliveryState": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 111, 112 (consumer 3)

records 111-112 (acquired, delivery count 2)

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Acknowledge 110 (consumer 1)

SPSO=111

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "BaseOffsetFirstOffset": 110,
      "LastOffset": 110,
      "DeliveryState": 2 (Acknowledged),
      "DeliveryCount": 2
    }
  ]
}


Acknowledge 111, 112 (consumer 3)

SPSO=120

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 120,
  "StateBatches": []
}


...

Several components work together to create share groups. The group coordinator is responsible for assignment, membership and the state of the group. The share-partition leaders are responsible for delivery and acknowledgement. The share coordinator is responsible for share-group persistent state.

Image RemovedImage Added

The following table explains how the administration operations on share groups work.

OperationInvolvesNotes
Create share groupGroup coordinator

This occurs as a side-effect of the initial ShareGroupHeartbeat request.

a) The group coordinator serves the ShareGroupHeartbeat  RPC. It writes a ShareGroupMetadata record and a ConsumerGroupMetadata ShareGroupPartitionMetadata record for the share group, a ShareGroupMemberMetadata record for the member, and a ShareGroupPartitionMetadata record for the share-partitions which are just about to be initialized, onto member onto the __consumer_offsets  topic. The group has now been created but the share-partitions are still being initialized. The group coordinator responds to the ShareGroupHeartbeat  RPC, but the list of assignments is empty.

b) For each share-partition being initialized, the group coordinator sends an InitializeShareGroupState  to the share coordinator. The SPSO is not known yet and is initialized to -1. The group epoch is used as the state epoch.

c) The share coordinator serves the InitializeShareGroupState  RPC. It writes a ShareSnapshot record to the __share_group_state  topic. When the record is replicated, the share coordinator responds to the RPC.

d) Back in the group coordinator, it writes an updated ShareGroupPartitionMetadata a ShareGroupPartitionStateMetadata record on the __consumer_offsets  topic for the share-partitions which are now initialized. The share-partition is partitions are now able to be included in an assignment in the share group.

Assign a share-partitionGroup coordinator and optionally share coordinatorWhen a topic-partition is eligible to be assigned to a member of a share group for the first time, the group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record to the __share_group_state  topic and responds to the group coordinator. The group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record, and the share-partition is now able to be included in an assignment in the share group.
List share groupsGroup coordinator
Describe share groupGroup coordinator
List share group offsetsGroup coordinator and share coordinatorThe admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupOffsetsState ReadShareGroupStateSummary  request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required.
Alter share group offsetsGroup coordinator and share coordinatorOnly empty share groups support this operation. The group coordinator bumps the group epoch, writes a ShareGroupMetadata, and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record with the new state epoch to the __share_group_state  topic. If the partition was not previously initialized, the group coordinator writes an updated ShareGroupStatePartitionMetadata record.
Delete share group offsetsGroup coordinator and share coordinatorThis is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic adding the topic to record the pending deletion of the offsets. It then the DeletingTopics  array and removing it from the InitializedTopics  array. This enables an interrupted deletion to be completed. The group coordinator sends a DeleteShareGroupState  request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes a second ShareGroupPartitionMetadata an updated ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic to complete the deletion of the topic from the offsetsshare group.
Delete share groupGroup coordinator and share coordinator

Only empty share groups can be deleted. The group coordinator If the share group has any topics with initialized state, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic to record the pending deletion of all share-group state. It then moving all initialized topics into the DeletingTopics  array, and then it sends a DeleteShareGroupState  request to each of the share coordinators for this share group's partitions, which write tombstones to logically delete the state from the __share_group_state  topic. Then, the group coordinator writes a tombstone ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata and finally a tombstone ConsumerGroupMetadata record ShareGroupMetadata record to the __consumer_offsets  topic.

...

Method signatureDescription
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets)Alter offset information for a share group.
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options) Alter offset information for a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitionsSet<String> topics)Delete offset information for a set of partitions topics in a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitionsSet<String> topics, DeleteShareGroupOffsetsOptions options) Delete offset information for a set of partitions topics in a share group.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds)Delete share groups from the cluster.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupOptions options) Delete share groups from the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds)Describe some share groups in the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds, DescribeShareGroupsOptions options) Describe some share groups in the cluster.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs)List the share group offsets available in the cluster for the specified share groups.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options) List the share group offsets available in the cluster for the specified share groups.
ListShareGroupsResult listShareGroups()List the share groups available in the cluster.
ListShareGroupsResult listShareGroups(ListShareGroupsOptions options) ListGroupsResult listGroups(ListGroupsOptions options) List the share groups available in the cluster.
ListGroupsResult listGroups() List the groups available in the cluster.
List the groups available in the cluster.

...

Code Block
languagejava
    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This is a convenience method for {@link #alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition.
     * @return The AlterShareGroupOffsetsResult.
     */
    default AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
        return alterShareGroupOffsets(groupId, offsets, new AlterShareGroupOffsetsOptions());
    }

    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition. Partitions not specified in the map are ignored.
     * @param options The options to use when altering the offsets.
     * @return The AlterShareGroupOffsetsResult.
     */
    AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options);

    /**
     * Delete offsets for a set of partitionstopics in a share group with the default
     * options. This will succeed at the partition level only if the group is not actively
     * subscribed to the corresponding topic.
     *
     * <p>This is a convenience method for {@link #deleteShareGroupOffsets(String, MapSet, DeleteShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to delete offsets.
     * @param topics The topics.
     * @return The DeleteShareGroupOffsetsResult.
     */
    default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition>Set<String> partitionstopics) {
        return deleteShareGroupOffsets(groupId, partitionstopics, new DeleteShareGroupOffsetsOptions());
    }

    /**
     * Delete offsets for a set of partitionstopics in a share group. This will
     *
 succeed at the partition level* only@param ifgroupId theThe group for iswhich notto activelydelete subscribedoffsets.
     * to@param thetopics correspondingThe topictopics.
       *
     * @param options The options to use when deleting offsets in a share group.
     * @return The DeleteShareGroupOffsetsResult.
     */
    DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId,
        Set<TopicPartition>Set<String> partitionstopics,
        DeleteShareGroupOffsetsOptions options);

    /**
     * Delete share groups from the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupIds The IDs of the groups to delete.
     * @return The DeleteShareGroupsResult.
     */
    default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
        return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
    }

    /**
     * Delete share groups from the cluster.
     *
     * @param groupIds The IDs of the groups to delete.
     * @param options The options to use when deleting a share group.
     * @return The DeleteShareGroupsResult.
     */
    DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options);

    /**
     * Describe some share groups in the cluster, with the default options.
     *
     * <p>This is a convenience method for {@link #describeShareGroups(Collection, DescribeShareGroupsOptions)}
     * with default options. See the overload for more details.
     *
     * @param groupIds The IDs of the groups to describe.
     * @return The DescribeShareGroupsResult.
     */
    default DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds) {
        return describeShareGroups(groupIds, new DescribeShareGroupsOptions());
    }

    /**
     * Describe some share groups in the cluster.
     *
     * @param groupIds The IDs of the groups to describe.
     * @param options  The options to use when describing the groups.
     * @return The DescribeShareGroupsResult.
     */
    DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds,
                                                 DescribeShareGroupsOptions options);

    /**
     * List the share group offsets available in the cluster for the specified share groups with the default options.
     *
     * <p>This is a convenience method for {@link #listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}
     * to list offsets of all partitions for the specified share groups with default options.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @return The ListShareGroupOffsetsResult
     */
    default ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs) {
        return listShareGroupOffsets(groupSpecs, new ListShareGroupOffsetsOptions());
    }

    /**
     * List the share group offsets available in the cluster for the specified share groups.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @param options The options to use when listing the share group offsets.
     * @return The ListShareGroupOffsetsResult
     */
    ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options);

    /**
     * List the share groups available in the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #listShareGroups(ListShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The ListShareGroupsResult.
     */
    default ListShareGroupsResult listShareGroups() {
        return listShareGroups(new ListShareGroupsOptions());
    }

    /**
     * List the share groups available in the cluster.
     *
     * @param options The options to use when listing the share groups.
     * @return The ListShareGroupsResult.
     */
    ListShareGroupsResult listShareGroups(ListShareGroupsOptions options);
 
    

AlterShareGroupOffsetsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;

/**
 * The result  * Listof the groups available in the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #listGroups(ListGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The ListGroupsResult{@link Admin#alterShareGroupOffsets(String groupId, Map<TopicPartition, Long>), AlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsResult {
    /**
     * Return a future which succeeds if all the alter offsets succeed.
     */
    defaultpublic ListGroupsResultKafkaFuture<Void> listGroupsall() {
        return listGroups(new ListGroupsOptions());
    }

      /**
     * ListReturn thea groupsfuture availablewhich incan thebe cluster.
used to check the result *
for a    * @param options The options to use when listing the groupsgiven partition.
     */
  @return The ListGroupsResult.
public KafkaFuture<Void> partitionResult(final TopicPartition partition) */{
    ListGroupsResult listGroups(ListGroupsOptions);

...

}
}

AlterShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.clientsclient.admin;
 
/**
 * TheOptions resultfor of the {@link Admin#alterShareGroupOffsets(String groupId, Map<TopicPartition, Long>), AlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsOptions extends AbstractOptions<AlterShareGroupOffsetsOptions> {
}

DeleteShareGroupOffsetsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#deleteShareGroupOffsets(String, Set<String>, DeleteShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsResultDeleteShareGroupOffsetsResult {
    /**
     * Return a future which succeeds only if all the alterdeletions offsets succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a future which can be used to check the result for a given partitiontopic.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartitionString partitiontopic) {
    }
}

...

DeleteShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#alterShareGroupOffsetsAdmin#deleteShareGroupOffsets(String groupId, Map<TopicPartitionSet<String>, Long>), AlterShareGroupOffsetsOptionsDeleteShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsOptionsDeleteShareGroupOffsetsOptions extends AbstractOptions<AlterShareGroupOffsetsOptions>AbstractOptions<DeleteShareGroupOffsetsOptions> {
}

...

DeleteShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#deleteShareGroupOffsetsAdmin#deleteShareGroups(StringCollection<String>, Set<TopicPartition>, DeleteShareGroupOffsetsOptionsDeleteShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsResultDeleteShareGroupsResult {
    /**
     * Return a future which succeeds only if all the deletions succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a future map from group id to futures which can be used to check the resultstatus forof aindividual given partitiondeletions.
     */
    public Map<String, KafkaFuture<Void>KafkaFuture<Void>> partitionResultdeletedGroups(final TopicPartition partition) {
    }
}

...

DeleteShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#deleteShareGroupOffsetsAdmin#deleteShareGroups(StringCollection<String>, Set<TopicPartition>, DeleteShareGroupOffsetsOptionsDeleteShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsOptionsDeleteShareGroupsOptions extends AbstractOptions<DeleteShareGroupOffsetsOptions>AbstractOptions<DeleteShareGroupsOptions> {
}

...

DescribeShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#deleteShareGroupsAdmin#describeShareGroups(Collection<String>, DeleteShareGroupsOptionsDescribeShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupsResultDescribeShareGroupsResult {
    /**
     * Return a future which yields all succeedsShareGroupDescription onlyobjects, if all the deletionsdescribes succeed.
     */
    public KafkaFuture<Map<String, KafkaFuture<Void>ShareGroupDescription>> all() {
    }

    /**
     * Return a map from group id to futures which canyield begroup useddescriptions.
 to check the status of individual deletions.
     */
    public Map<String, KafkaFuture<Void>>KafkaFuture<ShareGroupDescription>> deletedGroupsdescribedGroups() {
    }
}

...

ShareGroupDescription

This class does indeed reuse the MemberDescription  class intended for consumer groups. It is sufficiently general to work for share groups also.

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupsOptions extends AbstractOptions<DeleteShareGroupsOptions> {
}

DescribeShareGroupsResult

Code Block
languagejava
package import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.clientsacl.adminAclOperation;
 
/**
 * A Thedetailed resultdescription of a single theshare {@link Admin#describeShareGroups(Collection<String>, DescribeShareGroupsOptions)} callgroup in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeShareGroupsResultShareGroupDescription {
  public  /**
     * Return a future which yields all ShareGroupDescription objects, if all the describes succeed.
     */
    public KafkaFuture<Map<String, ShareGroupDescription>> all() {
    }

    /**
     * Return a map from group id to futures which yield group descriptions.
     */
    public Map<String, KafkaFuture<ShareGroupDescription>> describedGroups() {
    }
}

ShareGroupDescription

This class does indeed reuse the MemberDescription  class intended for consumer groups. It is sufficiently general to work for share groups also.

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.acl.AclOperation;

/**
 * A detailed description of a single share group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ShareGroupDescription {
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator);
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator, Set<AclOperation> authorizedOperations);

  /**
   * The id of the share group.
   */
  public String groupId();

  /**
   * A list of the members of the share group.
   */
  public Collection<MemberDescription> members();

  /**
   * The share group state, or UNKNOWN if the state cannot be parsed.
   */
  public ShareGroupState state();

  /**
   * The share group coordinator, or null if the coordinator is not known.
   */
  public Node coordinator();

  /**
   * The authorized operations for this group, or null if that information is not known.
   */
  public Set<AclOperation> authorizedOperations();
}

...

ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator);
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator, Set<AclOperation> authorizedOperations);

  /**
   * The id of the share group.
   */
  public String groupId();

  /**
   * A list of the members of the share group.
   */
  public Collection<MemberDescription> members();

  /**
   * The share group state, or UNKNOWN if the state cannot be parsed.
   */
  public ShareGroupState state();

  /**
   * The group coordinator, or null if the coordinator is not known.
   */
  public Node coordinator();

  /**
   * The authorized operations for this group, or null if that information is not known.
   */
  public Set<AclOperation> authorizedOperations();
}

DescribeShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for {@link Admin#describeShareGroups(Collection<String>, DescribeShareGroupsOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeShareGroupsOptions extends AbstractOptions<DescribeShareGroupsOptions> {
    public DescribeShareGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations);

    public boolean includeAuthorizedOperations();
}

ListShareGroupOffsetsResult

The offset returned for each topic-partition is the share-partition start offset (SPSO).

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsResult {
    /**
     * Return a future which yields all Map<String, Map<TopicPartition, Long> objects, if requests for all the groups succeed.
     */
    public KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() {
    }

    /**
     * Return a future which yields a map of topic partitions to offsets for the specified group.
     */
    public KafkaFuture<Map<TopicPartition, Long>> partitionsToOffset(String groupId) {
    }
}

ListShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for {@link Admin#describeShareGroups(Collection<String>Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, DescribeShareGroupsOptionsListShareGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeShareGroupsOptionsListShareGroupOffsetsOptions extends AbstractOptions<DescribeShareGroupsOptions>AbstractOptions<ListShareGroupOffsetsOptions> {
    public DescribeShareGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations);

    public boolean includeAuthorizedOperations();
}

ListShareGroupOffsetsResult

...

}

ListShareGroupOffsetsSpec

Code Block
languagejava
package org.apache.kafka.clientsclient.admin;
 
/**
 * The result of the Specification of share group offsets to list using {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsResultListShareGroupOffsetsSpec {
  public ListShareGroupOffsetsSpec();

  /**
   * Set *the Returntopic apartitions futurewhose whichoffsets yieldsare allto Map<String, Map<TopicPartition, Long> objects, if requests for all the groups succeed.
  be listed for a share group.
   */
  ListShareGroupOffsetsSpec  public KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() {
    }topicPartitions(Collection<TopicPartition> topicPartitions);

    /**
     * ReturnReturns athe futuretopic whichpartitions yieldswhose aoffsets mapare ofto topic partitions to offsetsbe listed for thea specifiedshare group.
     */
    public KafkaFuture<Map<TopicPartition, Long>> partitionsToOffset(String groupId) {
    }Collection<TopicPartition> topicPartitions();
}

...

ListShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clientclients.admin;
 
/**
 * The result Optionsof forthe {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptionsAdmin#listShareGroups(ListShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsOptionsListShareGroupsResult extends{
 AbstractOptions<ListShareGroupOffsetsOptions> {
}

ListShareGroupOffsetsSpec

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
  * Specification of share* groupReturns offsetsa tofuture listthat usingyields {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsSpec {
  public ListShareGroupOffsetsSpec();

either an exception, or the full set of share group listings.
     */
    public KafkaFuture<Collection<ShareGroupListing>> all() {
    }

    /**
     * SetReturns thea topicfuture partitionswhich whoseyields offsetsjust arethe tovalid belistings.
 listed for a share group.
   */*/
    public KafkaFuture<Collection<ShareGroupListing>> valid() {
  ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions);

 }
 
    /**
     * Returns a thefuture topicwhich partitionsyields whosejust offsetsthe areerrors towhich beoccurred.
 listed for a share group.*/
   */
 public Collection<TopicPartition>KafkaFuture<Collection<Throwable>> topicPartitionserrors(); {
    }
}

...

ShareGroupListing

Code Block
languagejava
package org.apache.kafka.clientsclient.admin;

import org.apache.kafka.common.ShareGroupState;

/**
 * TheA resultlisting of a share thegroup {@link Admin#listShareGroups(ListShareGroupsOptions)} callin the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupsResultShareGroupListing {
  public  /**ShareGroupListing(String groupId);
  public ShareGroupListing(String groupId, Optional<ShareGroupState> *state);

 Returns a/**
 future that yields* eitherThe an exception, orid of the full set of share group listings.
     */
    public KafkaFuture<Collection<ShareGroupListing>>String allgroupId() {
    };

    /**
     * ReturnsThe ashare future which yields just the valid listingsgroup state.
     */
    public KafkaFuture<Collection<ShareGroupListing>>Optional<ShareGroupState> validstate() {
    }
 
    /**
     * Returns a future which yields just the errors which occurred.
     */
    public KafkaFuture<Collection<Throwable>> errors() {
    }
}

ShareGroupListing

;
}

ListShareGroupsOptions

Code Block
languagejava
package org.apache.
Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.ShareGroupState;

/**
 * AOptions listingfor of a share group in the cluster{@link Admin#listShareGroups(ListShareGroupsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupsOptions extends ShareGroupListingAbstractOptions<ListShareGroupsOptions> {
  public ShareGroupListing(String groupId);
  public ShareGroupListing(String groupId, Optional<ShareGroupState> state);

  /**
   * The id of the share group.
 /**
     * If states is set, only groups in these states will be returned. Otherwise, all groups are returned.
     */
    public StringListShareGroupsOptions groupIdinStates(Set<ShareGroupState> states);

    /**
     * The share group state.
 Return the list of States that are requested or empty if no states have been specified.
     */
    public Optional<ShareGroupState>Set<ShareGroupState> statestates();
}

ListShareGroupsOptions

...

languagejava

...

GroupType

Another case is added to the org.apache.kafka.common.

...

GroupType  enum:

Enum constantDescription
SHARE("share") Share group

ShareGroupState

ListGroupsResult

...

languagejava

...

A new enum

org.apache.kafka.common.ShareGroupState  is added:

Enum constant

DEAD 

EMPTY 

STABLE 

UNKNOWN 

Its definition follows the pattern of ConsumerGroupState with fewer states.

Exceptions

The following new exceptions are added to the org.apache.kafka.common.errors  package corresponding to the new error codes in the Kafka protocol.

  • InvalidRecordStateException  - The record state is invalid. The acknowledgement of delivery could not be completed.
  • ShareSessionNotFoundException  - The share session is not found.
  • InvalidShareSessionEpochException  - The share session epoch is invalid.
  • FencedStateEpochException  - The share coordinator rejected the request because the share-group state epoch did not match.

They are all subclasses of RetriableException .

Broker API

ConsumerGroupPartitionAssignor

The new org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor  interface is an interface implemented by server-side assignors for consumer groups. It signifies that the partition assignor is suitable for use with consumer groups.

Code Block
languagejava
package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;
clients.admin;
  
/**
 * The result of the {@link Admin#listGroups(ListGroupsOptions)} call.
 * <p>
  * Server-side partition assignor for consumer groups used by the GroupCoordinator.
 *
 * The APIinterface ofis thiskept classin isan evolving,internal seemodule {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListGroupsResult {
    /**
     * Returns a future that yields either an exception, or the full set of group listings.
     */
    public KafkaFuture<Collection<GroupListing>> all() {
    }
 
    /**
     * Returns a future which yields just the valid listings.
     */
    public KafkaFuture<Collection<GroupListing>> valid() {
    }
  
    /**
     * Returns a future which yields just the errors which occurred.
     */
    public KafkaFuture<Collection<Throwable>> errors() {
    }
}

GroupListing

...

languagejava

...

until KIP-848 is fully
 * implemented and ready to be released.
 */
@InterfaceStability.Unstable
public interface ConsumerGroupPartitionAssignor extends PartitionAssignor {
}

The two built-in partition assignors for consumer groups, org.apache.kafka.coordinator.group.assignor.RangeAssignor  and org.apache.kafka.

...

coordinator.group.assignor.UniformAssignor , are both changed to implement this interface instead of org.apache.kafka.coordinator.group.assignor.PartitionAssignor  because they are intended only for use with consumer groups.

ShareGroupPartitionAssignor

The new org.apache.kafka.coordinator.group.assignor.ShareGroupPartitionAssignor  interface is an interface implemented by server-side assignors for share groups. It signifies that the partition assignor is suitable for use with share groups.

...

Code Block
languagejava
package org.apache.kafka.coordinator.clientgroup.adminassignor;

import org.apache.kafka.common.annotation.GroupTypeInterfaceStability;

/**
 * Options for {@link Admin#listGroups(ListGroupsOptions)} Server-side partition assignor for share groups used by the GroupCoordinator.
 *
 * The APIinterface ofis thiskept classin isan evolving,internal seemodule {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {
    /**
     * If types is set, only groups of these types will be returned. Otherwise, all groups are returned.
     */
    public ListGroupsOptions types(Set<GroupType> types);

    /**
     * Return the list of types that are requested or empty if no types have been specified.
     */
    public Set<GroupType> types();
}

GroupType

Another case is added to the org.apache.kafka.common.GroupType  enum:

...

until KIP-932 is fully
 * implemented and ready to be released.
 */
@InterfaceStability.Unstable
public interface ShareGroupPartitionAssignor extends PartitionAssignor {
}

One implementation of this interface, org.apache.kafka.coordinator.group.share.SimpleAssignor, is provided.

Command-line tools

kafka-share-groups.sh

A new tool called kafka-share-groups.sh is added for working with share groups. It has the following options:

Option

Description

--all-topics

Consider all topics assigned

ShareGroupState

A new enum org.apache.kafka.common.ShareGroupState  is added:

...

DEAD 

...

EMPTY 

...

STABLE 

...

UNKNOWN 

Its definition follows the pattern of ConsumerGroupState with fewer states.

Exceptions

The following new exceptions are added to the org.apache.kafka.common.errors  package corresponding to the new error codes in the Kafka protocol.

  • InvalidRecordStateException  - The record state is invalid. The acknowledgement of delivery could not be completed.
  • ShareSessionNotFoundException  - The share session is not found.
  • InvalidShareSessionEpochException  - The share session epoch is invalid.

They are all subclasses of RetriableException .

Command-line tools

kafka-share-groups.sh

A new tool called kafka-share-groups.sh is added for working with share groups. It has the following options:

Option

Description

--all-topics

Consider all topics assigned to a group in the `reset-offsets` process.

--bootstrap-server <String: server to connect to>

REQUIRED: The server(s) to connect to.

--command-config <String: command config property file>

Property file containing configs to be passed to Admin Client.

--delete

Pass in groups to delete topic partition offsets over the entire Delete share group. For instance --group g1 --group g2

--delete-offsets

Delete offsets of share group. Supports one share group at the time, and multiple topics.

--describe

Describe share group, members and list offset lag (number of records not yet processed) related to given groupoffset information.

--dry-run

Only show results without executing changes on share groups. Supported operations: reset-offsets.

--execute

Execute operation. Supported operations: reset-offsets.

--group <String: share group>

The share group we wish to act on.

--help

Print usage information.

--list

List all share groups.

--members

Describe members of the group. This option may be used with the '--describe' option only.

--offsets

Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only.

--reset-offsets

Reset offsets of share group. Supports one share group at a time, and instances must be inactive. Has 2 execution options, '--dry-run' (the default) and '–execute'. Has 3 reset options: '--to-earliest', '--to-latest' and '--to-datetime'. 

--state [String]

When specified with '--describe', includes the state of the group. When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. The valid values are 'Empty', 'Stable' and 'Dead'.

--timeout <Long: timeout (ms)>

The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default. (default: 5000)   

--to-datetime <String: datetime>

Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'.

--to-earliest

Reset offsets to earliest offset.

--to-latest

Reset offsets to latest offset.

--topic <String: topic>

The topic whose share group information should be deleted or topic which should be included in the reset offset process. When resetting offsets, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partitions to be included.

--version

Display Kafka version.

...

ConfigurationDescriptionValues
group.share.enable.coordinator.rebalance.protocols The list of enabled rebalance protocols. (Existing configuration)

"share"  is included in the list of protocols

Whether

to enable share groups

on the broker

.

Default false  while the feature is being developed. Will become true  in a future release

This will be added to the default value of this configuration property once this feature is complete.

group.share.delivery.count.limitThe maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
group.share.record.lock.duration.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.min.record.lock.duration.ms Share-group record acquisition lock minimum duration in milliseconds.Default 15000 (15 seconds), minimum 1000 (1 second), maximum 30000 (30 seconds)
group.share.max.record.lock.duration.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 1000 30000 (1 second30 seconds), maximum 3600000 (1 hour)
group.share.recordpartition.lockmax.partitionrecord.limitlocksShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000
group.share.session.timeout.ms 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds)
group.share.min.session.timeout.ms 

The minimum session timeout.

Default 45000 (45 seconds)
group.share.max.session.timeout.ms 

The maximum session timeout.

Default 60000 (60 seconds)
group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

Default 5000 (5 seconds)
group.share.min.heartbeat.interval.ms 

The minimum heartbeat interval.

Default 5000 (5 seconds)
group.share.max.heartbeat.interval.ms 

The maximum heartbeat interval.

Default 15000 (15 seconds)
group.share.max.groups 

The maximum number of share groups.

Default 10, minimum 1, maximum 100
group.share.max.size 

The maximum number of consumers that a single share group can accommodate.

Default 200, minimum 10, maximum 1000
group.share.assignors 

The server-side assignors as a list of full class names. The list must contain only a single entry which is used by all groups. In the initial delivery, only the first one in the list is usedfuture, it is envisaged that a group configuration will be provided to allow each group to choose one of the list of assignors.

A list of class names, currently limited to a single entry. Default "org.apache.kafka.servercoordinator.group.share.SimpleAssignor"
groupshare.sharecoordinator.state.topic.num.partitions 

The number of partitions for the share-group state topic (should not change after deployment).

Default 50
groupshare.sharecoordinator.state.topic.replication.factor 

The replication factor for the share-group state topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

Default 3 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
groupshare.sharecoordinator.state.topic.segment.bytes 

The log segment size for the share-group state topic.

Default 104857600
groupshare.sharecoordinator.state.topic.min.isr 

Overridden min.insync.replicas for the share-group state topic.

Default 2 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)

Group configuration

share.coordinator.state.topic.compression.codec 

Compression codec for the share-group state topic.

Default 0 (NONE)
share.coordinator.append.linger.ms 

The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk.

Default 10, minimum 0
share.coordinator.load.buffer.size 

Batch size for reading from the share-group state topic when loading state information into the cache (soft-limit, overridden if records are too large).

Default 5 * 1024 * 1024 (5242880), minimum 1
share.coordinator.snapshot.update.records.per.snapshot 

The number of update records the share coordinator writes between snapshot records.

Default 500, minimum 0
share.coordinator.threads 

The number of threads used by the share coordinator.

Default 1, minimum 1
share.coordinator.write.timeout.ms 

The duration in milliseconds that the share coordinator will wait for all replicas of the share-group state topic to receive a write.

Default 5000, minimum 1
share.fetch.purgatory.purge.interval.requests 

The purge interval (in number of requests) of the share fetch request purgatory

Default 1000

Group configuration

The following The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.

ConfigurationDescriptionValues
group.share.isolation.level 

Controls how to read records written transactionally. If set to "read_committed", the share group will only deliver transactional records which have been committed. If set to "read_uncommitted", the share group will return all messages, even transactional messages which have been aborted. Non-transactional records will be returned unconditionally in either mode.

Valid values "read_committed"  and "read_uncommitted" (default)

group.share.auto.offset.reset 

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server:

  • "earliest" : automatically reset the offset to the earliest offset

  • "latest" : automatically reset the offset to the latest offset

Valid values "latest"  (default) and "earliest" 

group.share.record.lock.duration.ms 

Record acquisition lock duration in milliseconds.

nullIf not specified, which uses the cluster broker configuration group.share.record.lock.duration.ms.

If specified, minimum limited by the broker configuration group.share.min.record.lock.duration.ms , minimum 1000, maximum limited by the cluster broker configuration group.share.max.record.lock.duration.ms

group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

If not specified, uses the broker configuration group.share.heartbeat.interval.ms .

If specified, minimum limited by the broker configuration group.share.min.heartbeat.interval.ms , maximum limited by the broker configuration group.share.max.heartbeat.interval.ms

group.type 

Ensures that a newly created group has the specified group type.

.share.session.timeout.ms 

The timeout to detect client failures when using the share group protocol.

If not specified, uses the broker configuration group.share.session.timeout.ms .

If specified, minimum limited by the broker configuration group.share.min.session.timeout.ms , maximum limited by the broker configuration group.share.max.session.timeout.ms .Valid values: "consumer"  or "share" , there is no default

Consumer configuration

The existing consumer configurations apply for share groups with the following exceptions:

  • auto.offset.reset : this is handled by a dynamic group configuration group.share.auto.offset.reset 
  • enable.auto.commit  and auto.commit.interval.ms : share groups do not support auto-commit
  • group.instance.id : this concept is not supported by share groups
  • isolation.level : this is handled by a dynamic group configuration group.share.isolation.level 
  • partition.assignment.strategy : share groups do not support client-side partition assignors
  • interceptor.classes : interceptors are not supported
  • protocol.type : this configuration is used to select the group protocol used for KafkaConsumer
  • session.timeout.ms : this is deprecated in KIP-848 and is not supported for share groups
  • heartbeat.interval.ms : this is deprecated in KIP-848 and is not supported for share groups

Kafka protocol changes

This KIP introduces the following new APIs:

  • ShareGroupHeartbeat - for consumers to form and maintain share groups
  • ShareGroupDescribe - for describing share groups
  • ShareFetch - for fetching records from share-partition leaders
  • ShareAcknowledge - for acknowledging delivery of records with share-partition leaders
  • AlterShareGroupOffsets - for altering the share-partition start offsets for the share-partitions in a share group
  • DeleteShareGroupOffsets - for deleting the offsets for the share-partitions in a share group
  • DescribeShareGroupOffsets - for describing the offsets for the share-partitions in a share group
  • InitializeShareGroupState  - for initializing share-partition state on a share-coordinator
  • ReadShareGroupState - for reading share-partition state from a share coordinator

  • WriteShareGroupState - for writing share-partition state to a share coordinator

  • DeleteShareGroupState - for deleting share-partition state from a share coordinator

  • ReadShareGroupOffsetsStateReadShareGroupStateSummary  - for reading the offsets from the sharea summary of the share-partition state from a share coordinator

...

  • FindCoordinator  - for finding coordinators, to support share coordinatorsListGroups  - for listing groups, to support listing share groups

Access control

This table gives the ACLs required for the new APIs.

RPCOperationResource
ShareGroupHeartbeat ReadGroup
ShareGroupDescribe DescribeGroup
ShareFetch 

Read

Read

Group

Topic

ShareAcknowledge 

Read

Read

Group

Topic

AlterShareGroupOffsets ReadGroup
DeleteShareGroupOffsets ReadGroup
DescribeShareGroupOffsets DescribeGroup
InitializeShareGroupState ClusterActionCluster
ReadShareGroupState ClusterActionCluster
WriteShareGroupState ClusterActionCluster
DeleteShareGroupState ClusterActionCluster
ReadShareGroupStateOffsets ClusterActionCluster

Error codes

This KIP adds the following error codes the Kafka protocol.

  • INVALID_RECORD_STATE  (121) - The record state is invalid. The acknowledgement of delivery could not be completed.
  • SHARE_SESSION_NOT_FOUND  (122) - The share session is not found.
  • INVALID_SHARE_SESSION_EPOCH  (123) - The share session epoch is invalid.
  • FENCED_STATE_EPOCH  (124) - The share coordinator rejected the request because share-group state epoch did not match.

...

The KIP introduces version 56.

Request schema

Version 5 6 adds the new key type of FindCoordinatorRequest.CoordinatorType.SHARE with value 2 with the key of "group:topicId:partition".

Code Block
{
  "apiKey": 10,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "FindCoordinatorRequest",
  // Version 1 adds KeyType.
  //
  // Version 2 is the same as version 1.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds support for batching via CoordinatorKeys (KIP-699)
  //
  // Version 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
  //
  // Version 6 adds support for share groups (KIP-932).
  "validVersions": "0-56",
  "deprecatedVersions": "0",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "Key", "type": "string", "versions": "0-3",
      "about": "The coordinator key." },
    { "name": "KeyType", "type": "int8", "versions": "1+", "default": "0", "ignorable": false,
      "about": "The coordinator key type. (Group, transaction, share, etc.)" },
    { "name": "CoordinatorKeys", "type": "[]string", "versions": "4+",
      "about": "The coordinator keys." }
  ]
}

Response schema

Version 5 6 is the same as version 45.

ListGroups API

This KIP introduces version 6 

Request schema

ShareGroupHeartbeat API

The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.

Request schema

The member must set all the top-level fields when it joins for the first time or when an error occurs. Otherwise, it is expected only to fill in the fields which have changed since the last heartbeatVersion 6 adds support for share groups.

Code Block
{
  "apiKey": 1676,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListGroupsRequest",
  // Version 1 and 2 are the same as version 0.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the StatesFilter field (KIP-518).
  //
  // Version 5 adds the TypesFilter field (KIP-848).
  //
  // Version 6 adds support for share groups.
  ShareGroupHeartbeatRequest",
  "validVersions": "0-6",
  "flexibleVersions": "30+",
  "fields": [
    { "name": "StatesFilterGroupId", "type": "[]string", "versions": "40+", "entityType": "groupId",
      "about": "The states of the groups we want to list. If empty, all groups are returned with their stategroup identifier." },
    { "name": "TypesFilterMemberId", "type": "[]string", "versions": "50+",
      "about": "The member ID typesgenerated ofby the groupscoordinator. weThe wantmember toID list.must Ifbe empty,kept allduring groupsthe areentire returnedlifetime withof theirthe typemember." },
  ]
}

Response schema

Version 6 is the same as version 5.

ShareGroupHeartbeat API

The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.

Request schema

The member must set all the top-level fields when it joins for the first time or when an error occurs. Otherwise, it is expected only to fill in the fields which have changed since the last heartbeat.

Code Block
{
  "apiKey": TBD,
     { "name": "MemberEpoch", "type": "requestint32",
  "listenersversions": ["broker0+"],
      "nameabout": "ShareGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
  The current member epoch; 0 to join the group; -1 to leave the group." },
    { "name": "GroupIdRackId", "type": "string", "versions": "0+",  "entityTypenullableVersions": "groupId0+",
        "aboutdefault": "The group identifier." }null",
      { "nameabout": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID generated by the coordinator. The member ID must be kept during the entire lifetime of the membernull if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." },
      { "name": "MemberEpochSubscribedTopicNames", "type": "int32", "versions": "0+",
        "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
      { "name": "RackId", "type": "[]string", "versions": "0+",  "nullableVersions": "0+", "default": "null",
        "about": "null if not provided or if it didn't change since the last heartbeat; the racksubscribed IDtopic of consumernames otherwise." },
  ]
}

Response schema

The group coordinator will only send the Assignment field when it changes.

Code Block
{
  "apiKey": 76,
 { "nametype": "RebalanceTimeoutMsresponse",
  "typename": "int32ShareGroupHeartbeatResponse",
  "versionsvalidVersions": "0+",
  "defaultflexibleVersions": -1"0+",
  // Supported errors:
  //  "about": "-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
  - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_MEMBER_ID (version 0+)
  // - GROUP_MAX_SIZE_REACHED (version 0+)
  "fields": [
    { "name": "SubscribedTopicNamesThrottleTimeMs", "type": "[]stringint32", "versions": "0+", "nullableVersions": "0+", "default
      "about": "null",
The duration in milliseconds for which the request "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwisewas throttled due to a quota violation, or zero if the request did not violate any quota." },
    ]
}

Response schema

The group coordinator will only send the Assignment field when it changes.

Code Block
{
  "apiKeyname": TBD"ErrorCode",
  "type": "responseint16",
  "nameversions": "ShareGroupHeartbeatResponse0+",
   "validVersions": "0",
  "flexibleVersionsabout": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_MEMBER_ID (version 0+)
  // - GROUP_MAX_SIZE_REACHED (version 0+)
  "fields": [The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "ThrottleTimeMsMemberId", "type": "int32string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The durationmember inID millisecondsgenerated for whichby the requestcoordinator. wasOnly throttledprovided due to a quota violation, or zero if when the requestmember didjoins notwith violateMemberEpoch any== quota0." },
    { "name": "ErrorCodeMemberEpoch", "type": "int16int32", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no errormember epoch." },
    { "name": "ErrorMessageHeartbeatIntervalMs", "type": "stringint32", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-levelheartbeat errorinterval message, or null if there was no errorin milliseconds." },
    { "name": "MemberIdAssignment", "type": "stringAssignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "Thenull memberif IDnot generatedprovided; bythe theassignment coordinatorotherwise. Only provided when the member joins with MemberEpoch == 0." },
   ", "fields": [
        { "name": "MemberEpochTopicPartitions", "type": "int32[]TopicPartitions", "versions": "0+",
          "about": "The partitions assigned to the member epoch." },
    ]}
  ],
 { "namecommonStructs": "HeartbeatIntervalMs",[
    { "typename": "int32TopicPartitions", "versions": "0+",
      "aboutfields": "The heartbeat interval in milliseconds." },
[
        { "name": "AssignmentTopicId", "type": "Assignmentuuid", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
      "about": "nullThe if not provided; the assignment otherwise.", "fields": [
        { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
          "about": "The partitions assigned to the member." }
    ]}
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic IDtopic ID." },
        { "name": "Partitions", "type": "[]int32", "versions": "0+",
          "about": "The partitions." }
    ]}
  ]
}

...

The ShareGroupDescribe API is used to describe share groups.

Request schema

Code Block
{
  "apiKey": NN77,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareGroupDescribeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",
      "about": "The ids of the groups to describe" },
    { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+",
      "about": "Whether to include authorized operations." }
  ]
}

Response schema

Code Block
{
  "apiKey": NN77,
  "type": "response",
  "name": "ShareGroupDescribeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - INVALID_GROUP_ID (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
      "about": "Each described group.",
      "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The describe error, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The top-level error message, or null if there was no error." },
        { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
          "about": "The group ID string." },
        { "name": "GroupState", "type": "string", "versions": "0+",
          "about": "The group state string, or the empty string." },
        { "name": "GroupEpoch", "type": "int32", "versions": "0+",
          "about": "The group epoch." },
        { "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
          "about": "The assignment epoch." },
        { "name": "AssignorName", "type": "string", "versions": "0+",
          "about": "The selected assignor." },
        { "name": "Members", "type": "[]Member", "versions": "0+",
          "about": "The members.",
          "fields": [
            { "name": "MemberId", "type": "string", "versions": "0+",
              "about": "The member ID." },
            { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The member rack ID." },
            { "name": "MemberEpoch", "type": "int32", "versions": "0+",
              "about": "The current member epoch." },
            { "name": "ClientId", "type": "string", "versions": "0+",
              "about": "The client ID." },
            { "name": "ClientHost", "type": "string", "versions": "0+",
              "about": "The client host." },
            { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName",
              "about": "The subscribed topic names." },
            { "name": "Assignment", "type": "Assignment", "versions": "0+",
              "about": "The current assignment." }
          ]},
        { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
          "about": "32-bit bitfield to represent authorized operations for this group." }
      ]
    }
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
    ]},
    { "name": "Assignment", "versions": "0+", "fields": [
      { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
        "about": "The assigned topic-partitions to the member." }
    ]}
  ]
}

...

  • If acknowledgements are being made for a partition and no records should be fetched, PartitionMaxBytes  should be set to zero.
  • If acknowledgements are being made for a partition which is being removed from the share session, the partition is included in the Topics array with PartitionMaxBytes  set to zero AND the partition is included in ForgottenTopicsData .
  • If acknowledgements are being made for a partition in the final request in a share session, the partition is included in the Topics  array and ShareSessionEpoch  is set to -1. No data will be fetched and it is not necessary to include the partition in ForgottenTopicsData .
  • If there's an error which affects all piggybacked acknowledgements but which does not prevent data from being fetched, the AcknowledgeErrorCode  in the response will be set to the same value for all partitions which had piggybacked acknowledgements.

Request schema

For the AcknowledgementBatches of each topic-partition, the BaseOffsets FirstOffsets  must be ascending order and the ranges must be non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

Code Block
{
  "apiKey": NN78,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The member ID." },
    { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
      "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0+",
            "about": "BaseFirst offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
            "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}
    ]},
    { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "ignorable": false,
      "about": "The partitions to remove from this share session.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions indexes to forget." }
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN78,
  "type": "response",
  "name": "ShareFetchResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors for ErrorCode and AcknowledgeErrorCode:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+) - only for AcknowledgeErrorCode
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - CORRUPT_MESSAGE (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top-level response error code." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
     { "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The fetch error code, or 0 if there was no fetch error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The fetch error message, or null if there was no fetch error." },
        { "name": "AcknowledgeErrorCode", "type": "int16", "versions": "0+",
          "about": "The acknowledge error code, or 0 if there was no acknowledge error." },
        { "name": "AcknowledgeErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The acknowledge error message, or null if there was no acknowledge error." }, 
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch." }
        ]},
        { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."},
        { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields":  [
          {"name": "BaseOffsetFirstOffset", "type":  "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."},
          {"name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records."},
          {"name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count of this batch of acquired records."}
        ]}
      ]}
    ]},
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+",
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

...

The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.

Request schema

For the AcknowledgementBatches of each topic-partition, the BaseOffsets FirstOffsets  must be ascending order and the ranges must be non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

Code Block
{
  "apiKey": NN79,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareAcknowledgeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The member ID." },
    { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
      "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
      "about": "The topics containing records to acknowledge.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
        "about": "The partitions containing records to acknowledge.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0+",
            "about": "BaseFirst offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
            "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN79,
  "type": "response",
  "name": "ShareAcknowledgeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top level response error code." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch." }
        ]}
      ]}
    ]},
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+",
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

...

The AlterShareGroupOffsets API is used to alter the share-partition start offsets for the share-partitions in a share group. The share-partition leader handles group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "AlterShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]AlterShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to alter offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]AlterShareGroupOffsetsRequestPartition", "versions": "0+",
        "about": "Each partition to alter offsets for.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "AlterShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]AlterShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]AlterShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

The DeleteShareGroupOffsets API is used to delete the offsets for the share-partitions in a share group. The share-partition leader handles group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]DeleteShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to delete offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",]}
        "about": "The partitions." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DeleteShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DeleteShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

DescribeShareGroupOffsets API

The DescribeShareGroupOffsets API is used to describe the offsets for the share-partitions in a share group. The share-partition leader handles group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DescribeShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to describe offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DescribeShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DescribeShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset."},
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

The InitializeShareGroupState API is used by the group coordinator to initialize the share-partition state. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "InitializeShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]InitializeStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch forof thisthe share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, or -1 if the start offset is not being initialized." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "InitializeShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]InitializeStateResult", "versions": "0+",
      "about": "The initialization results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

The ReadShareGroupState API is used by share-partition leaders to read share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]ReadStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKeyname": NN"LeaderEpoch",
  "type": "responseint32",
  "nameversions": "ReadShareGroupStateResponse0+",
          "about", "The leader epoch of the share-partition." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupStateResponse",
  "validVersions": ""validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadStateResult", "versions": "0+",
      "about": "The read results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch forof thisthe share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, which can be -1 if it is not yet initialized." },
        { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields":[
          { "name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0+",
            "about": "The basefirst offset of this state batch." },
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of this state batch." },
          { "name": "DeliveryState", "type": "int8", "versions": "0+",
            "about": "The delivery state - 0:Available,2:Acked,4:Archived." },
          { "name": "DeliveryCount", "type": "int16", "versions": "0+",
            "about": "The delivery count." }
        ]}
      ]}
    ]}
  ]
}

...

The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "WriteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]WriteStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch for thisof the share-partition." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The leader epoch of the share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, or -1 if the start offset is not being written." },
        { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields": [
          { "name": "BaseOffsetFirstOffset", "type": "int64", "versions": "0+",
            "about": "The basefirst offset of this state batch." },
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of this state batch." },
          { "name": "DeliveryState", "type": "int8", "versions": "0+",
            "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
          { "name": "DeliveryCount", "type": "int16", "versions": "0+",
            "about": "The delivery count." }
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "WriteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]WriteStateResult", "versions": "0+",
      "about": "The write results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

The DeleteShareGroupState API is used by the group coordinator to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]DeleteStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]DeleteStateResult", "versions": "0+",
      "about": "The delete results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

ReadShareGroupStateSummary API

The ReadShareGroupOffsetsState ReadShareGroupStateSummary API is used by the group coordinator to read a summary of the offset information from share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupOffsetsStateRequestReadShareGroupStateSummaryRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]ReadOffsetsStateDataReadStateSummaryData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
       ] { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The leader epoch of the share-partition." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupOffsetsStateResponseReadShareGroupStateSummaryResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadOffsetsStateResultReadStateSummaryResult", "versions": "0+",
      "about": "The read results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch forof thisthe share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset." }
      ]}
    ]}
  ]
}

...

This section describes the new record types.

Group and assignment metadata

In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets  topic.

With the exception of the ShareGroupStatePartitionMetadata record, all of the record types are analogous to those introduced in KIP-848 for consumer groups.

For each share group, a single ConsumerGroupMetadata ShareGroupMetadata record is written for the group epoch, and a ShareGroupPartitionMetadata record is written for the topic-partitions being assigned by the group. When the group is deleted, a tombstone record is records are written.

For each member, there is a ShareGroupMemberMetadata record which enables group membership to continue seamlessly across a group coordinator change. When the member leaves, a tombstone record is written.

For each share group, a ShareGroupTargetAssignmentMetadata record is written to record the group epoch used to compute the assignment. For each member, there is a ShareGroupTargetAssignmentMember record which persists the target assignment, and a ShareGroupCurrentMemberAssignment record which persists the current assignment and is also used to keep track of the member epoch.

There is also a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.

ConsumerGroupMetadataKey

...

ShareGroupMetadataKey

Code Block
{
  "type": "data",
  "name": "ConsumerGroupMetadataKeyShareGroupMetadataKey",
  "validVersions": "311",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "311",
      "about": "The group id." }
  ]
}

ConsumerGroupMetadataValue

A new version of the record value is introduced contains the Type  field. For a share group, the type will be "share" . For a consumer group, the type is the default value of 0. The values are from the ordinal values of the enumeration org.apache.kafka.coordinator.Group.GroupType .

ShareGroupMetadataValue

Code Block
Code Block
{
  "type": "data",
  "name": "ConsumerGroupMetadataValueShareGroupMetadataValue",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Epoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
  ]
}

ShareGroupPartitionMetadataKey

Code Block
{
  // Version 1 adds Type field
    { "name"type": "data",
  "name": "ShareGroupPartitionMetadataKey",
  "validVersions": "9",
  "flexibleVersions": "Typenone",
  "fields": [
    { "typename": "int8GroupId", "versionstype": "1+string", "defaultversions": "09",
      "about": "The group type - 0:consumer, 1:classic, 3:shareid." }
  ]
}

...

ShareGroupPartitionMetadataValue

Code Block
{
  "type": "data",
  "name": "ShareGroupMemberMetadataKeyShareGroupPartitionMetadataValue",
  "validVersions": "100",
  "flexibleVersions": "none0+",
  "fields": [
    { "name": "GroupIdTopics", "typeversions": "string0+", "versionstype": "10[]TopicMetadata",
      "about": "The list of grouptopic idmetadata.", "fields": },[
      { "name": "MemberIdTopicId", "typeversions": "string0+", "versionstype": "10uuid",
        "about": "The membertopic id." },
  ]
}

ShareGroupMemberMetadataValue

Code Block
{
  "type": "data",
 { "name": "ShareGroupMemberMetadataValueTopicName",
  "validVersionsversions": "0+",
  "flexibleVersionstype": "0+string",
  "fields      "about": [
"The topic name." },
      { "name": "RackIdNumPartitions", "versions": "0+", "nullableVersionstype": "0+int32",
 "type": "string",
      "about": "The (optional) rack id number of partitions of the topic." },
      { "name": "ClientIdPartitionMetadata", "versions": "0+", "type": "string[]PartitionMetadata",
        "about": "The client id." },
Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [
        { "name": "ClientHostPartition", "versions": "0+", "type": "stringint32",
          "about": "The clientpartition hostnumber." },
        { "name": "SubscribedTopicNamesRacks", "versions": "0+", "type": "[]string",
          "about": "The listset of subscribedracks topicthat names." },
    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
      "about": "The rebalance timeout" the partition is mapped to." }
      ]}
    ]}
  ]
}

...

ShareGroupMemberMetadataKey

Code Block
{
  "type": "data",
  "name": "ShareGroupPartitionMetadataKeyShareGroupMemberMetadataKey",
  "validVersions": "910",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "910",
      "about": "The group id." },
    ]
}

ShareGroupPartitionMetadataValue

Code Block
{
  "typename": "dataMemberId",
  "nametype": "ShareGroupPartitionMetadataValuestring",
  "validVersionsversions": "010",
      "flexibleVersionsabout": "The member id." }
  ]
}

ShareGroupMemberMetadataValue

Code Block
{0+",
  "fieldstype": ["data",
  "name": "ShareGroupMemberMetadataValue",
 { "namevalidVersions": "Epoch0",
  "versionsflexibleVersions": "0+",
  "typefields": "int32" },[
    { "name": "InitializedTopicsRackId", "versions": "0+", "typenullableVersions": "[]TopicMetadata" },
    { "name0+", "type": "InitializingTopicsstring",
      "versionsabout": "0+", "type": "[]TopicIndexMetadataThe (optional) rack id." },
    { "name": "DeletingTopicsClientId", "versions": "0+", "type": "[]TopicMetadata" }
  ],
  "commonStructs": [
string",
     { "nameabout": "TopicMetadata", "versions": "0+", "fields": [
  The client id." },
    { "name": "TopicIdClientHost", "typeversions": "uuid0+", "versionstype": "0+string",
        "about": "The topicclient identifierhost." },
      { "name": "NumPartitionsSubscribedTopicNames", "typeversions": "int320+", "versionstype": "0+[]string",
        "about": "The numberlist of partitionssubscribed topic names." }
  ]
}

ShareGroupTargetAssignmentMetadataKey

Code Block
{
   ]}"type": "data",
  "name": "ShareGroupTargetAssignmentMetadataKey",
 { "namevalidVersions": "TopicIndexMetadata12",
  "versionsflexibleVersions": "0+none",
  "fields": [
      { "name": "TopicIdGroupId", "versionstype": "0+string", "typeversions": "uuid12" },
      "about": "The group id." }
  ]
}

ShareGroupTargerAssignmentMetadataValue

Code Block
{
  "type": "data",
  "name": "StartPartitionIndexShareGroupTargetAssignmentMetadataValue",
  "versionsvalidVersions": "0+",
  "typeflexibleVersions": "int320+" },
  "fields": [
    { "name": "EndPartitionIndexAssignmentEpoch", "versions": "0+", "type": "int32" }
    ],
      "about": "The assignment epoch." }
  ]
}

The InitializingTopics  field is used as the first stage of a two-stage process to initialize the persistent state for a set of share-partitions. When the share coordinator successfully responds to InitializeShareGroupState  , the topic-partitions are moved into the InitializedTopics  field.

In a similar way, the DeletingTopics  field is used as the first stage of a two-stage process to delete the persistent state for a set of share-partitions.

Share-group state

These records are written by the share coordinator on the __share_group_state  topic.

ShareSnapshotKey

ShareGroupTargetAssignmentMemberKey

Code Block
{
  "type": "data",
  "name": "ShareSnapshotKeyShareGroupTargetAssignmentMemberKey",
  "validVersions": "013",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "013",
      "about": "The group id." },
    { "name": "TopicIdMemberId", "type": "uuidstring", "versions": "013",
      "about": "The topicmember id." }
  },]
}

ShareGroupTargetAssignmentMemberValue

Code Block
{
  "type": "data",
 { "name": "PartitionShareGroupTargetAssignmentMemberValue",
  "typevalidVersions": "int320",
  "versionsflexibleVersions": "0+",
  "fields": [
   "The partition{ index." }
  ]
}

ShareSnapshotValue

Code Block
{
  "type"name": "dataTopicPartitions",
  "nameversions": "ShareSnapshotValue0+",
  "validVersionstype": "0[]TopicPartition",
      "flexibleVersionsabout": "0+The assigned partitions.",
  "fields": [
      { "name": "SnapshotEpochTopicId", "type": "uint16", "versions": "0+",
      "abouttype": "The snapshot epoch.uuid" },
      { "name": "StartOffsetPartitions", "typeversions": "int640+", "versionstype": "[]int32" }
    ]}
  ]
}

ShareGroupCurrentMemberAssignmentKey

Code Block
{
0",
      "abouttype": "The share-partition start offset." }data",
    { "name": "StateBatchesShareGroupCurrentMemberAssignmentKey",
  "typevalidVersions": "[]StateBatch14",
  "versionsflexibleVersions": "0none",
  "fields": [
      { "name": "BaseOffsetGroupId", "type": "int64string", "versions": "014",
        "about": "The base offset of this state batchgroup id." },
      { "name": "LastOffsetMemberId", "type": "int64string", "versions": "014",
        "about": "The last offset of this state batchmember id." },
  ]
}

ShareGroupCurrentMemberAssignmentValue

Code Block
{
  "type": "data",
 { "name": "DeliveryStateShareGroupCurrentMemberAssignmentValue",
  "typevalidVersions": "int80",
  "versionsflexibleVersions": "0+",
  "fields": [
    { "aboutname": "MemberEpoch"The delivery state - 0:Available,2:Acked,4:Archived, "versions": "0+", "type": "int32",
      "about": "The current member epoch that is expected from the member in the heartbeat request." },
      { "name": "DeliveryCountPreviousMemberEpoch", "typeversions": "int160+", "versionstype": "0int32",
        "about": "The delivery count." }
    ]} 
  ]
}

ShareUpdateKey

Code Block
{
  "type": "data",
  "name": "ShareUpdateKey",
  "validVersions": "1",
  "flexibleVersions": "none",
  "fields": [If the last epoch bump is lost before reaching the member, the member will retry with the previous epoch." },
    { "name": "State", "versions": "0+", "type": "int8",
      "about": "The member state. See MemberState for the possible values." },
    { "name": "GroupIdAssignedPartitions", "typeversions": "string0+", "versionstype": "0[]TopicPartitions",
      "about": "The group id partitions assigned to (or owned by) this member." },
   ],
 { "namecommonStructs": "TopicId", [
    { "typename": "uuidTopicPartitions", "versions": "0+",
      "aboutfields": "The topic id." },
[
      { "name": "PartitionTopicId", "type": "int32uuid", "versions": "0+",
      "The partition index "about": "The topic Id." },
      ]
}

ShareUpdateValue

Code Block
{
  "typename": "dataPartitions",
  "nametype": "ShareUpdateValue[]int32",
  "validVersionsversions": "0+",
        "flexibleVersionsabout": "0+",
The partition "fieldsIds.": [}
    ]}
  ]
}

ShareGroupStatePartitionMetadataKey

Code Block
{
  "nametype": "SnapshotEpochdata",
  "typename": "uint16ShareGroupStatePartitionMetadataKey",
  "versionsvalidVersions": "015",
      "aboutflexibleVersions": "none"The,
 snapshot epoch."fields": },[
    { "name": "StartOffsetGroupId", "type": "int64string", "versions": "015",
      "about": "The group id." }
  ]
}

ShareGroupStatePartitionMetadataValue

Code Block
{
  "type": "data",
 share-partition start offset, or -1 if the start offset is not being updated." },
    { "name": "StateBatchesShareGroupPartitionMetadataValue",
  "typevalidVersions": "[]StateBatch0",
  "versionsflexibleVersions": "0+",
  "fields": [
      { "name": "BaseOffsetInitializedTopics", "typeversions": "int640+", "versionstype": "0[]TopicPartitionsInfo",
        "about": "The basetopics offsetwith ofinitialized thisshare-group state batch." },
      { "name": "LastOffsetDeletingTopics", "typeversions": "int640+", "versionstype": "0[]TopicInfo",
        "about": "The topics lastwhose offsetshare-group ofstate thisis statebeing batchdeleted." },
  ],
    "commonStructs": [
    { "name": "DeliveryStateTopicPartitionsInfo", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "int8uuid", "versions": "0+",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived"  topic identifier." },
      { "name": "TopicName", "type": "string", "versions": "0+",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
    ]},
    { "name": "TopicInfo", "versions": "0+", "fields": [
      { "name": "DeliveryCountTopicId", "type": "int16uuid", "versions": "0+",
        "about": "The delivery count." }
    ]} 
  ]
}

Metrics

Broker Metrics

The following new broker metrics should be added:

": "The topic identifier." },
      { "name": "TopicName", "type": "string", "versions": "0+",
        "about": "The topic name." }
    ]}
  ]
}

Share-group state

These records are written by the share coordinator on the __share_group_state  topic.

ShareSnapshotKey

Code Block
{
  "type": "data",
  "name": "ShareSnapshotKey",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0",
      "about": "The group id." },
    { "name": "TopicId", "type": "uuid", "versions": "0",
      "about": "The topic id." },
    { "name": "Partition", "type": "int32", "versions": "0",
      "about": "The partition index." }
  ]
}

ShareSnapshotValue

Code Block
{
  "type": "data",
  "name": "ShareSnapshotValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SnapshotEpoch", "type": "uint16", "versions": "0",
      "about": "The snapshot epoch." },
    { "name": "StateEpoch", "type": "int32", "versions": "0+",
      "about": "The state epoch of the share-partition." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
      "about": "The leader epoch of the share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [
      { "name": "FirstOffset", "type": "int64", "versions": "0",
        "about": "The first offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "DeliveryState", "type": "int8", "versions": "0",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]} 
  ]
}

ShareUpdateKey

Code Block
{
  "type": "data",
  "name": "ShareUpdateKey",
  "validVersions": "1",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "1",
      "about": "The group id." },
    { "name": "TopicId", "type": "uuid", "versions": "1",
      "about": "The topic id." },
    { "name": "Partition", "type": "int32", "versions": "1",
      "about": "The partition index." }
  ]
}

ShareUpdateValue

Code Block
{
  "type": "data",
  "name": "ShareUpdateValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SnapshotEpoch", "type": "uint16", "versions": "0",
      "about": "The snapshot epoch." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
      "about": "The leader epoch of the share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset, or -1 if the start offset is not being updated." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [
      { "name": "FirstOffset", "type": "int64", "versions": "0",
        "about": "The first offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "DeliveryState", "type": "int8", "versions": "0",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]} 
  ]
}

Metrics

Broker metrics

The following new broker metrics should be added:

Metric Name

Type

Group

Tags

Description

JMX Bean

group-count

Gauge

group-coordinator-metrics

protocol: share

The total number of share groups managed by group coordinator.

kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share 

rebalance (rebalance-rate and rebalance-count)

Meter

group-coordinator-metrics

protocol: share

The total number of share group rebalances count and rate.

kafka.server:type=group-coordinator-metrics,name=rebalance-rate,protocol=share 

kafka.server:type=group-coordinator-metrics,name=rebalance-count,protocol=share 

group-countGaugegroup-coordinator-metrics

protocol: share

state: {Empty|Stable|Dead} 

The number of share groups in respective state.kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={Empty|Stable|Dead} 

share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count)

Meter

share-group-metrics


The number of acknowledgement requests.

kafka.server:type=share-group-metrics,name=share-acknowledgement-rate

kafka.server:type=share-group-metrics,name=share-acknowledgement-count 

record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count)

Meter

share-group-metrics

ack-type:{Accept,Release,Reject} 


The number of records acknowledged per acknowledgement type.

kafka.server:type=share-group-metrics,name=record-acknowledgement-rate,ack-type={Accept,Release,Reject} 

kafka.server:type=share-group-metrics,name=record-acknowledgement-count,ack-type={Accept,Release,Reject} 

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-group-metrics


The time taken to load the share partitions.

kafka.server:type=share-group-metrics,name=partition-load-time-avg

kafka.server:type=share-group-metrics,name=partition-load-time-max  

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-coordinator-metrics


The time taken in milliseconds to load the share-group state from the share-group state partitions.

kafka.server:type=share-coordinator-metrics,name=partition-load-time-avg 

kafka.server:type=share-coordinator-metrcs,name=partition-load-time-max 

thread-idle-ratio (thread-idle-ratio-min and thread-idle-ratio-avg)

Meter

share-coordinator-metrics


The fraction of time the share coordinator thread is idle.

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-min 

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-avg 

write (write-rate and write-total)

Meter

share-coordinator-metrics


The number of share-group state write calls per second.

kafka.server:type=share-coordinator-metrics,name=write-rate 

kafka.server:type=share-coordinator-metrics,name=write-total 

write-latency (write-latency-avg and write-latency-max)

Meter

share-coordinator-metrics


The time taken for a share-group state write call, including the time to write to the share-group state topic.

kafka.server:type=share-coordinator-metrics,name=write-latency-avg 

kafka.server:type=share-coordinator-metrics,name=write-latency-max 

num-partitions

Gauge

share-coordinator-metrics


The number of partitions in the share-state topic.

kafka.server:type=share-coordinator-metrics,name=num-partitions 

The group coordinator uses metrics in the group group-coordinator-metrics . The share-partition leader uses metrics in the group share-group-metrics . The share coordinator uses metrics in the group share-coordinator-metrics .

Client metrics

The following new client metrics should be added. Following the terminology in KIP-714, these are standard metrics, as opposed to required metrics.

Metric Name

Type

Group

Tags

Description

JMX Bean

Telemetry metric name (KIP-714)

last-poll-seconds-ago

Gauge

consumer-share-metrics

client-id 

The number of seconds since the last poll() invocation.

kafka.consumer:type=consumer-share-metrics,name=last-poll-seconds-ago,client-id=([-.\w]+) 

org.apache.kafka.consumer.share.last.poll.seconds.ago 

time-between-poll-avg

Meter

consumer-share-metrics

client-id 

The average delay between invocations of poll() in milliseconds.

kafka.consumer:type=consumer-share-metrics,name=time-between-poll-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.time.between.poll.avg 

time-between-poll-max

Meter

consumer-share-metrics

client-id 

The maximum delay between invocations of poll() in milliseconds.

kafka.consumer:type=consumer-share-metrics,name=last-poll-seconds-ago,client-id=([-.\w]+) 

o.a.k.consumer.share.time.between.poll.max 

poll-idle-ratio-avg

Meter

consumer-share-metrics

client-id 

The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records.

kafka.consumer:type=consumer-share-metrics,name=poll-idle-ratio-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.poll.idle.ratio.avg 

heartbeat-response-time-max

Meter

consumer-share-coordinator-metrics

client-id 

The maximum time taken to receive a response to a heartbeat request in milliseconds.

kafka.consumer:type=consumer-share-coordinator-metrics,name=heartbeat-response-time-max,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.heartbeat.response.time.max 

heartbeat-rate

Meter

consumer-share-coordinator-metrics

client-id 

The number of heartbeats per second.

kafka.consumer:type=consumer-share-coordinator-metrics,name=heartbeat-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.heartbeat.rate 

heartbeat-total

Meter

consumer-share-coordinator-metrics

client-id 

The total number of heartbeats.

kafka.consumer:type=consumer-share-coordinator-metrics,name=heartbeat-total,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.heartbeat.total 

last-heartbeat-seconds-ago

Gauge

consumer-share-coordinator-metrics

client-id 

The number of seconds since the last coordinator heartbeat was sent.

kafka.consumer:type=consumer-share-coordinator-metrics,name=last-heartbeat-seconds-ago,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.last.heartbeat.seconds.ago 

rebalance-total

Meter

consumer-share-coordinator-metrics

client-id 

The total number of rebalance events.

kafka.consumer:type=consumer-share-coordinator-metrics,name=rebalance-total,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.rebalance.total 

rebalance-rate-per-hour

Meter

consumer-share-coordinator-metrics

client-id 

The number of rebalance events per hour.

kafka.consumer:type=consumer-share-coordinator-metrics,name=rebalance-rate-per-hour,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.rebalance.rate.per.hour 

fetch-size-avg

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of bytes fetched per request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-size-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.size.avg 

fetch-size-max

Meter

consumer-share-fetch-manager-metrics

client-id 

The maximum number of bytes fetched per request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-size-max,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.size.max 

bytes-fetched-rate

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of bytes fetched per second.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=bytes-fetched-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.bytes.fetched.rate 

bytes-fetched-total

Meter

consumer-share-fetch-manager-metrics

client-id 

The total number of bytes fetched.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=bytes-fetched-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.bytes.fetched.total 

records-per-request-avg

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of records in each request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=records-per-request-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.records.per.request.avg 

records-per-request-max

Meter

consumer-share-fetch-manager-metrics

client-id 

The maximum number of records in a request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=records-per-request-max,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.records.per.request.max 

records-fetched-rate

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of records fetched per second.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=records-fetched-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.records.fetched.rate 

records-fetched-total

Meter

consumer-share-fetch-manager-metrics

client-id 

The total number of records fetched.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=records-fetched-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.records.fetched.total 

acknowledgements-send-rate

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of record acknowledgements sent per second.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=acknowledgements-send-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.acknowledgements.send.rate 

acknowledgements-send-total

Meter

consumer-share-fetch-manager-metrics

client-id 

The total number of record acknowledgements sent.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=acknowledgements-send-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.acknowledgements.send.total 

acknowledgements-error-rate

Meter

consumer-share-fetch-manager-metrics

client-id 

The average number of record acknowledgements that resulted in errors per second.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=acknowledgements-error-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.acknowledgements.error.rate 

acknowledgements-error-total

Meter

consumer-share-fetch-manager-metrics

client-id 

The total number of record acknowledgements that resulted in errors.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=acknowledgements-error-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.acknowledgements.error.total 

fetch-latency-avg

Meter

consumer-share-fetch-manager-metrics

client-id 

The average time taken for a fetch request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-latency-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.latency.avg 

fetch-latency-max

Meter

consumer-share-fetch-manager-metrics

client-id 

The maximum time taken for any fetch request.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-latency-max,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.latency.max 

fetch-rate

Meter

consumer-share-fetch-manager-metrics

client-id 

The number of fetch requests per second.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.rate 

fetch-total

Meter

consumer-share-fetch-manager-metrics

client-id 

The total number of fetch requests.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.total 

fetch-throttle-time-avg

Meter

consumer-share-fetch-manager-metrics

client-id 

The average throttle time in milliseconds.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-throttle-time-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.throttle.time.avg 

fetch-throttle-time-max

Meter

consumer-share-fetch-manager-metrics

client-id 

The maximum throttle time in milliseconds.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=fetch-throttle-time-max,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.throttle.time.

Metric Name

Type

Group

Tags

Description

JMX Bean

group-count

Gauge

group-coordinator-metrics

protocol: share

The total number of share groups managed by group coordinator.

kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share 

rebalance (rebalance-rate and rebalance-count)

Meter

group-coordinator-metrics

protocol: share

The total number of share group rebalances count and rate.

kafka.server:type=group-coordinator-metrics,name=rebalance-rate,protocol=share 

kafka.server:type=group-coordinator-metrics,name=rebalance-count,protocol=share 

num-partitions

Gauge

group-coordinator-metrics

protocol: share

The number of share partitions managed by group coordinator. 

kafka.server:type=group-coordinator-metrics,name=num-partitions,protocol=share 

group-countGaugegroup-coordinator-metrics

protocol: share

state: {empty|stable|dead} 

The number of share groups in respective state.kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={empty|stable|dead} 

share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count)

Meter

group-coordinator-metrics

protocol: share

The total number of offsets acknowledged for share groups.

kafka.server:type=group-coordinator-metrics,name=share-acknowledgement-rate,protocol=share

kafka.server:type=group-coordinator-metrics,name=share-acknowledgement-count,protocol=share 

record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count)

Meter

group-coordinator-metrics

protocol: share

ack-type:{accept,release,reject} 

The number of records acknowledged per acknowledgement type.

kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-rate,protocol=share,ack-type={accept,release,reject} 

kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-count,protocol=share,ack-type={accept,release,reject} 

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

group-coordinator-metrics

protocol: share 

The time taken to load the share partitions.

kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg,protocol=share 

kafka.server:type=group-coordinator-metrics,name=partition-load-time-max,protocol=share  

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-coordinator-metrics

The time taken in milliseconds to load the share-group state from the share-group state partitions loaded in the last 30 seconds.

kafka.server:type=share-coordinator-metrics,name=partition-load-time-avg 

kafka.server:type=share-coordinator-metrcs,name=partition-load-time-max 

thread-idle-ratio (thread-idle-ratio-min and thread-idle-ratio-avg)

Meter

share-coordinator-metrics

The fraction of time the share coordinator thread is idle.

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-min 

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-avg 

write (write-rate and write-total)

Meter

share-coordinator-metrics

The number of share-group state write calls per second.

kafka.server:type=share-coordinator-metrics,name=write-rate 

kafka.server:type=share-coordinator-metrics,name=write-total 

write-latency (write-latency-avg and write-latency-total)

Meter

share-coordinator-metrics

The time taken for a share-group state write call.

kafka.server:type=share-coordinator-metrics,name=write-latency-avg 

kafka.server:type=share-coordinator-metrics,name=write-latency-

max 

Future Work

There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.

...

For topics in which share groups are the only consumption model, it would be nice to be able to have the SPSO of the share-partitions taken in to consideration when cleaning the log and advancing the log start offset.

It would also be possible to have share-group configuration to control the maximum time-to-live for records and automatically archive them at this time.

Finally, this KIP does not include support for acknowledging delivery using transactions for exactly-once semantics. Conceptually, this is quite straightforward but would take changes to the API.

Compatibility, Deprecation, and Migration Plan

Kafka Broker Migration

cleaning the log and advancing the log start offset.

It would also be possible to have share-group configuration to control the maximum time-to-live for records and automatically archive them at this time.

Finally, this KIP does not include support for acknowledging delivery using transactions for exactly-once semantics. Conceptually, this is quite straightforward but would take changes to the API.

Compatibility, Deprecation, and Migration Plan

Kafka Broker Migration

As is customary for large KIPs, this KIP will be delivered into Apache Kafka progressively, starting with an Early Access release, and then moving through Preview, and finally General Availability.

Early access and Preview

At these stages, KIP-932 can be used for familiarization and experimentation, but not production use. It is disabled in the default configuration for the cluster, and must be explicitly enabled. Doing so is not appropriate in a production cluster.

To turn on the feature, add "share"  to the group.coordinator.rebalance.protocols  configuration. There is no support for upgrade or downgrade.

General availability

This KIP builds upon KIP-848 and the new group coordinator.

To upgrade a cluster, it is first necessary to perform a rolling upgrade of the cluster to a software version which supports share groups. Then, the new protocol is enabled using the kafka-feature.sh  tool by setting a group.version  which supports share groups. Finally, the group.coordinator.rebalance.protocols  configuration is changed to add "share"  to the list of enabled rebalance protocols.

KIP-848 This KIP builds upon KIP-848 which introduced the new group coordinator and the new records for the __consumer_offsets  topic. The pre-KIP-848 group coordinator will not recognize the new records, so this downgrade is not supported.

Downgrading to a software version that supports the new group coordinator but does not support share groups is supported. This KIP-932 adds a new version for the ConsumerGroupMetadataValue  record to include the group type. If the software version does not understand the v1 record type, it will assume the records apply to a consumer group of the same name. We should make sure this is a harmless situation.More information need to be added here based on the share-partition persistence mechanism. Details are still under consideration hererecords to the __consumer_offsets  topic which will not be understood by the group coordinator. The group coordinator will ignore these records. The __share_group_state  topic will be unused because there will be no share coordinator and can be manually deleted.

Test Plan

The feature will be thoroughly tested with unit, integration and system tests. We will also carry out performance testing both to understand the performance of share groups, and also to understand the impact on brokers with this new feature.

...