Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added share.fetch.purgatory.purge.interval.requests

Table of Contents

Status

Current stateVotingAccepted

Discussion thread: https://lists.apache.org/thread/9wdxthfsbm5xf01y4xvq6qtlg0gq96lq

...

The following diagram illustrates how these pieces are wired together using the new Kafka protocol RPCs introduced by this KIP.

Image RemovedImage Added

Relationship with consumer groups

Consumer groups and share groups exist in the same namespace in a Kafka cluster. As a result, if there’s a consumer group with a particular name, you cannot create a share group with the same name, and vice versa. But consumer groups and share groups are quite different in terms of use, so attempts to perform operations for one kind of group on a group of the incorrect type will fail with a GroupIdNotFoundException . The new AdminClient.listGroups  method gives a way of listing groups of all types.

Because consumer groups and share groups are both created automatically on first use, the type of group that is created depends upon how the group ID was first used. For production use, choosing a naming convention for groups in advance and using this configuration to enforce the group type is recommended.

A future KIP is planned for the administration of groups, encompassing aspects such as administrative creation of groups, and listing groups of all types including consumer groups, share groups and others such as groups used by Kafka Connect.

Share group membership

This KIP builds upon the new consumer group protocol in KIP-848: The Next Generation of the Consumer Rebalance Protocol.

...

Whenever the group epoch is larger than the target assignment epoch, the group coordinator triggers the computation of a new target assignment based on the latest group metadata using a server-side assignor. For a share group, the group coordinator does not persist the assignment. The assignment epoch becomes the group epoch of the group metadata used to compute the assignment.

...

Share groups do not have the ASSIGNING state because only server-side assignors are supported, and do not need the RECONCILING state because there’s no need for all members to converge before the group enters the STABLE state. There is no automatic expiration of share groups.

  • EMPTY - When a share group is created or the last member leaves the group, the share group is EMPTY.
  • STABLE - When a share group has active members, the share group is STABLE.
  • DEAD - When the share group remains EMPTY for a configured period, the group coordinator transitions it to DEAD to delete it. This only happens if the group does not have any persistent share-group state. Share groups are intentionally more durable than consumer groups.

...

For a share group, the group coordinator persists three seven kinds of records:

  • ShareGroupMetadata - this is written to reserve the group's ID as a share group in the namespace of groups.
  • ShareGroupMemberMetadata - this is written to allow group membership to continue seamlessly across a group coordinator change.
  • ShareGroupPartitionMetadata - this is written to persist the metadata about the partitions the group is assigning to the members.
  • ShareGroupTargetAssignmentMetadata - this is written to persist the assignment epoch.
  • ShareGroupTargetAssignmentMember - this is written to persist the target assignment for a member.
  • ShareGroupCurrentMemberAssignment - this is written to persist the current assignment for a member, and also to maintain the sequence of member epochs.
  • ShareGroupStatePartitionMetadata - this is written whenever the set of topic-partitions being consumed in the share group changes. Its purpose is to keep track of which topic-partitions will have share-group persistent state.

When the group coordinator fails over, the newly elected coordinator replays the state from the __consumer_offsets  partition. This means a share group will remain in existence across the fail-over, along with the list of members . However, the assignments are not persisted and will be recalculated by the new group coordinatorand their assignments. It will bump the group epoch as a result.

...

This KIP introduces org.apache.kafka.coordinator.group.share.SimpleAssignor .  It It attempts to balance the number of consumers assigned to the subscribed partitions, and assumes that all consumers are equally capable of consuming records. When calculating a new target assignment, the assignor is aware of the current assignment (assuming the group coordinator hasn't changed) and can use this to influence the calculation of the new target assignment. The calculation proceeds as follows:

  1. The assignor hashes the member IDs of the members and maps the partitions assigned to the members based on the hash. This gives approximately even balance.

  2. If any partitions

...

  1. were not assigned any members by (1) and do not have members already assigned in the current assignment, members are assigned

...

  1. round-robin until each partition has at least one member assigned to it.

  2. If any partitions were assigned members by (1) and also have members in the current assignment assigned by (2), the members assigned by (2) are removed.

When the number of members is greater than When the number of members is greater than or equal to the number of partitions for a subscribed topic and every partition was assigned at least one member, adding more members to the group simply assigns more members to the partitions. If however, any partitions were not assigned any members by hashing, adding more members changes the distribution and will revoke some partitions previously assigned. Because assignments in share groups are looser than they are in consumer groups, the group coordinator doesn't require the members to confirm they have revoked partitions before they can be assigned to other members. For a brief period while the rebalancing is occurring, some partitions may have more members consuming than the assignment requires, but this situation soon resolves as the members receive their updated assignments and stop fetching from the revoked partitions.

...

In order to retain similarity with KafkaConsumer  and make it easy for applications to move between the two interfaces, KafkaShareConsumer  follows the same threading rules as KafkaConsumer . It is not thread-safe and only one thread at a time may called the call the methods of KafkaShareConsumer . Unsynchronized access will result in ConcurrentModificationException . The only exception to this rule is KafkaShareConsumer.wakeup()  which may be called from any thread.

...

The share coordinator will prefer to write a snapshot over an update (for example, when the SPSO moves and there are no in-flight records, the snapshot will be small and there’s no need to write an update instead). The share coordinator will take a snapshot periodically, frequently enough to minimise the number of ShareUpdate records to replay but rarely enough to minimise the performance cost of taking snapshots.

There are two kinds of fencing for share-group state.

The records also include a state epoch. This is used to ensure that all of the components involved are aligned on the current state, and to fence any calls to write to an old version of the state. Whenever the share-group state is initialized, the state epoch is set to the share group's current group epoch. This gives a very simple way to make sure that reads and writes refer to the current version of the state.

The records also include a leader epoch. Whenever the share-partition leader calls the share coordinator, it provides the leader epoch of the partition in the request. The share coordinator uses this to fence zombie share-partition leaders. When a new leader is elected for a share-partition, the leader epoch of the partition is incremented. This means that the new leader will use a higher leader epoch in its requests to the share coordinator, and any trailing requests from earlier share-partition leaders can be rejected with FENCED_LEADER_EPOCH . The leader epoch is persisted by the share coordinator in its ShareSnapshot and ShareUpdate records. When a new state epoch is used, the leader epoch is initialized to -1 , and it is properly initialized when a share-partition leader makes a request.

The records have the following content (note that the version number is used to differentiate between the record types, just as for the consumer-offsets topic):

...

The group coordinator is responsible for asking the share coordinator to initialize and delete the durable share-partition state. The group coordinator uses the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record to keep track of which share-partitions have been initialized.

...

  • When a topic is added to the set of subscribed topics for a share group and is not yet in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record
  • When partitions are added to a topic which is already in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record
  • When KafkaAdmin.alterShareGroupOffsets  is used to reset the SPSO for a share-partition

The state is deleted in the following cases:

  • When a topic in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record is deleted
  • When KafkaAdmin.deleteShareGroupOffsets  is used to delete state for a share-partition, most likely as a result of an administrator cleaning up a topic which is no longer in use by this share group
  • When the share group is deleted

The group coordinator periodically reconciles its "hard" state in the ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata with the "soft" state in the cluster metadata. This is how it observes relevant changes to topics, such as topic deletion. The ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata contains the set of topic-partitions which are known to be initialized.

...

  • Sends the InitializeShareGroupState  RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic added.

...

  • Sends the InitializeShareGroupState  RPC to the share coordinators for all of the new partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the number array of partitions for the topic increased.

In order to remove a topic, the group coordinator:

  • If the topic still exists, it writes a ShareGroupStatePartitionMetadata record with the topic added to the DeletingTopics  array. This enables group coordinator failover to continue the deletion. If the topic does not exist, the requirement to continue deletion can be inferred by the non-existence of the topic.
  • Sends the DeleteShareGroupState  RPC to the share coordinators for all of the partitions of the topic. This step can be Sends the DeleteShareGroupState  RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record with the topic removed.

...

The share-partition leader must be aware of when the group coordinator is being used to alter the SPSO with a KafkaAdmin.alterShareGroupOffsets request. This only occurs when the group is empty. As a result, when the set of share sessions transitions from 0 to 1, the share-partition leader uses the ReadShareGroupOffsetsState RPC ReadShareGroupStateSummary RPC to validate its state epoch (this request is much cheaper for the share coordinator to handle than ReadShareGroupState ). We know that there are no acquired records, so re-initializing the share-partition leader is non-disruptive. If the state epoch has changed, the share-partition leader issues a ReadShareGroupState RPC to the share coordinator and uses the response to re-initialize.  

...

Several components work together to create share groups. The group coordinator is responsible for assignment, membership and the state of the group. The share-partition leaders are responsible for delivery and acknowledgement. The share coordinator is responsible for share-group persistent state.

Image RemovedImage Added

The following table explains how the administration operations on share groups work.

OperationInvolvesNotes
Create share groupGroup coordinator

This occurs as a side-effect of the initial ShareGroupHeartbeat request.

a) The group coordinator serves the ShareGroupHeartbeat  RPC. It writes a ShareGroupMetadata record and a ShareGroupPartitionMetadata record for the share group, a ShareGroupMemberMetadata record for the member onto the __consumer_offsets  topic. The group has now been created but the share-partitions are still being initialized. The group coordinator responds to the ShareGroupHeartbeat  RPC, but the list of assignments is empty.

b) For each share-partition being initialized, the group coordinator sends an InitializeShareGroupState  to the share coordinator. The SPSO is not known yet and is initialized to -1. The group epoch is used as the state epoch.

c) The share coordinator serves the InitializeShareGroupState  RPC. It writes a ShareSnapshot record to the __share_group_state  topic. When the record is replicated, the share coordinator responds to the RPC.

d) Back in the group coordinator, it writes a ShareGroupPartitionMetadata ShareGroupPartitionStateMetadata record on the __consumer_offsets  topic for the share-partitions which are now initialized. The share-partitions are now able to be included in an assignment in the share group.

Assign a share-partitionGroup coordinator and optionally share coordinatorWhen a topic-partition is eligible to be assigned to a member of a share group for the first time, the group coordinator sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record to the __share_group_state  topic and responds to the group coordinator. The group coordinator writes an updated ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata record, and the share-partition is now able to be included in an assignment in the share group.
List share groupsGroup coordinator
Describe share groupGroup coordinator
List share group offsetsGroup coordinator and share coordinatorThe admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupOffsetsState ReadShareGroupStateSummary  request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required.
Alter share group offsetsGroup coordinator and share coordinatorOnly empty share groups support this operation. The group coordinator bumps the group epoch, writes a ShareGroupMetadata, and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record with the new state epoch to the __share_group_state  topic. If the partition was not previously initialized, the group coordinator writes an updated ShareGroupStatePartitionMetadata record.
Delete share group offsetsGroup coordinator and share coordinatorThis is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic adding the topic to the DeletingTopics  array and removing it from the InitializedTopics  array. This enables an interrupted deletion to be completed. The group coordinator sends a DeleteShareGroupState  request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes a ShareGroupPartitionMetadata an updated ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic to complete the deletion of the offsetstopic from the share group.
Delete share groupGroup coordinator and share coordinator

Only empty share groups can be deleted. The group coordinator If the share group has any topics with initialized state, it writes a ShareGroupStatePartitionMetadata record to the __consumer_offsets  topic moving all initialized topics into the DeletingTopics  array, and then it sends a DeleteShareGroupState  request to each of the share coordinators for this share group's partitions, which write tombstones to logically delete the state from the __share_group_state  topic. Then, the group coordinator writes a tombstone ShareGroupPartitionMetadata ShareGroupStatePartitionMetadata and finally a tombstone ShareGroupMetadata record to the __consumer_offsets  topic.

...

Method signatureDescription
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets)Alter offset information for a share group.
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options) Alter offset information for a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitionsSet<String> topics)Delete offset information for a set of partitions topics in a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitionsSet<String> topics, DeleteShareGroupOffsetsOptions options) Delete offset information for a set of partitions topics in a share group.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds)Delete share groups from the cluster.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupOptions options) Delete share groups from the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds)Describe some share groups in the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds, DescribeShareGroupsOptions options) Describe some share groups in the cluster.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs)List the share group offsets available in the cluster for the specified share groups.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options) List the share group offsets available in the cluster for the specified share groups.
ListShareGroupsResult listShareGroups()List the share groups available in the cluster.
ListShareGroupsResult listShareGroups(ListShareGroupsOptions options) List the share groups available in the cluster.
ListGroupsResult listGroups() List the groups available in the cluster.
ListGroupsResult listGroups(ListGroupsOptions options) List the groups available in the cluster.

The equivalence between the consumer group and share group interfaces is clear. There are some differences:

...

Code Block
languagejava
    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This is a convenience method for {@link #alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition.
     * @return The AlterShareGroupOffsetsResult.
     */
    default AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
        return alterShareGroupOffsets(groupId, offsets, new AlterShareGroupOffsetsOptions());
    }

    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition. Partitions not specified in the map are ignored.
     * @param options The options to use when altering the offsets.
     * @return The AlterShareGroupOffsetsResult.
     */
    AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options);

    /**
     * Delete offsets for a set of partitionstopics in a share group with the default
     * options.
 This will succeed at the*
 partition level only if the* group<p>This is nota actively
convenience method for {@link  * subscribed to the corresponding topic#deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} with default options.
     *
 See the overload for *more <p>Thisdetails.
 is a convenience method for*
 {@link #deleteShareGroupOffsets(String, Map, DeleteShareGroupOffsetsOptions)} with* default@param options.
groupId The group for which *to Seedelete theoffsets.
 overload for more details.
 * @param topics The *topics.
     * @return The DeleteShareGroupOffsetsResult.
     */
    default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition>Set<String> partitionstopics) {
        return deleteShareGroupOffsets(groupId, partitionstopics, new DeleteShareGroupOffsetsOptions());
    }

    /**
     * Delete offsets for a set of partitionstopics in a share group. This will
     *
 succeed at the partition level* only@param ifgroupId theThe group for iswhich notto activelydelete subscribedoffsets.
     * to@param thetopics correspondingThe topictopics.
       *
     * @param options The options to use when deleting offsets in a share group.
     * @return The DeleteShareGroupOffsetsResult.
     */
    DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId,
        Set<TopicPartition>Set<String> partitionstopics,
        DeleteShareGroupOffsetsOptions options);

    /**
     * Delete share groups from the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupIds The IDs of the groups to delete.
     * @return The DeleteShareGroupsResult.
     */
    default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
        return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
    }

    /**
     * Delete share groups from the cluster.
     *
     * @param groupIds The IDs of the groups to delete.
     * @param options The options to use when deleting a share group.
     * @return The DeleteShareGroupsResult.
     */
    DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options);

    /**
     * Describe some share groups in the cluster, with the default options.
     *
     * <p>This is a convenience method for {@link #describeShareGroups(Collection, DescribeShareGroupsOptions)}
     * with default options. See the overload for more details.
     *
     * @param groupIds The IDs of the groups to describe.
     * @return The DescribeShareGroupsResult.
     */
    default DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds) {
        return describeShareGroups(groupIds, new DescribeShareGroupsOptions());
    }

    /**
     * Describe some share groups in the cluster.
     *
     * @param groupIds The IDs of the groups to describe.
     * @param options  The options to use when describing the groups.
     * @return The DescribeShareGroupsResult.
     */
    DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds,
                                                 DescribeShareGroupsOptions options);

    /**
     * List the share group offsets available in the cluster for the specified share groups with the default options.
     *
     * <p>This is a convenience method for {@link #listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}
     * to list offsets of all partitions for the specified share groups with default options.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @return The ListShareGroupOffsetsResult
     */
    default ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs) {
        return listShareGroupOffsets(groupSpecs, new ListShareGroupOffsetsOptions());
    }

    /**
     * List the share group offsets available in the cluster for the specified share groups.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @param options The options to use when listing the share group offsets.
     * @return The ListShareGroupOffsetsResult
     */
    ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options);

    /**
     * List the share groups available in the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #listShareGroups(ListShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The ListShareGroupsResult.
     */
    default ListShareGroupsResult listShareGroups() {
        return listShareGroups(new ListShareGroupsOptions());
    }

    /**
     * List the share groups available in the cluster.
     *
     * @param options The options to use when listing the share groups.
     * @return The ListShareGroupsResult.
     */
    ListShareGroupsResult listShareGroups(ListShareGroupsOptions options);
 
    

AlterShareGroupOffsetsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;

/**
 * The result  * Listof the groups available in the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #listGroups(ListGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The ListGroupsResult{@link Admin#alterShareGroupOffsets(String groupId, Map<TopicPartition, Long>), AlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsResult {
    /**
     * Return a future which succeeds if all the alter offsets succeed.
     */
    defaultpublic ListGroupsResultKafkaFuture<Void> listGroupsall() {
    }

    /**
   return listGroups(new ListGroupsOptions());
    }

    /**
     * List the groups available in the cluster.
     *
     * @param options The options to use when listing the groups.
     * @return The ListGroupsResult.
     */
    ListGroupsResult listGroups(ListGroupsOptions);

...

 * Return a future which can be used to check the result for a given partition.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
    }
}

AlterShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#alterShareGroupOffsets(String groupId, Map<TopicPartition, Long>), AlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsOptions extends AbstractOptions<AlterShareGroupOffsetsOptions> {
}

DeleteShareGroupOffsetsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#alterShareGroupOffsetsAdmin#deleteShareGroupOffsets(String groupId, Map<TopicPartitionSet<String>, Long>), AlterShareGroupOffsetsOptionsDeleteShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsResultDeleteShareGroupOffsetsResult {
    /**
     * Return a future which succeeds only if all the alter offsetsdeletions succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a future which can be used to check the result for a given partitiontopic.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartitionString partitiontopic) {
    }
}

...

DeleteShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#alterShareGroupOffsetsAdmin#deleteShareGroupOffsets(String groupId, Map<TopicPartitionSet<String>, Long>), AlterShareGroupOffsetsOptionsDeleteShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsOptionsDeleteShareGroupOffsetsOptions extends AbstractOptions<AlterShareGroupOffsetsOptions>AbstractOptions<DeleteShareGroupOffsetsOptions> {
}

...

DeleteShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#deleteShareGroupOffsetsAdmin#deleteShareGroups(StringCollection<String>, Set<TopicPartition>, DeleteShareGroupOffsetsOptionsDeleteShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsResultDeleteShareGroupsResult {
    /**
     * Return a future which succeeds only if all the deletions succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a futuremap from group id to futures which can be used to check the resultstatus forof aindividual given partitiondeletions.
     */
    public Map<String, KafkaFuture<Void>KafkaFuture<Void>> partitionResultdeletedGroups(final TopicPartition partition) {
    }
}

...

DeleteShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#deleteShareGroupOffsetsAdmin#deleteShareGroups(StringCollection<String>, Set<TopicPartition>, DeleteShareGroupOffsetsOptionsDeleteShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsOptionsDeleteShareGroupsOptions extends AbstractOptions<DeleteShareGroupOffsetsOptions>AbstractOptions<DeleteShareGroupsOptions> {
}

...

DescribeShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#deleteShareGroupsAdmin#describeShareGroups(Collection<String>, DeleteShareGroupsOptionsDescribeShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupsResultDescribeShareGroupsResult {
    /**
     * Return a future which yields all succeedsShareGroupDescription onlyobjects, if all the deletionsdescribes succeed.
     */
    public KafkaFuture<Void>KafkaFuture<Map<String, ShareGroupDescription>> all() {
    }

    /**
     * Return a map from group id to futures which canyield be used to check the status of individual deletionsgroup descriptions.
     */
    public Map<String, KafkaFuture<Void>>KafkaFuture<ShareGroupDescription>> deletedGroupsdescribedGroups() {
    }
}

...

ShareGroupDescription

This class does indeed reuse the MemberDescription  class intended for consumer groups. It is sufficiently general to work for share groups also.

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.acl.AclOperation;

/**
 * Options for the {@link Admin#deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} call A detailed description of a single share group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupsOptions extendsShareGroupDescription AbstractOptions<DeleteShareGroupsOptions> {
}

DescribeShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#describeShareGroups(Collection<String>, DescribeShareGroupsOptions)} call.
 * <p>
 public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator);
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator, Set<AclOperation> authorizedOperations);

  /**
   * The APIid of the thisshare classgroup.
 is evolving, see*/
 {@link Admin}public for details.String groupId();

  /**/
@InterfaceStability.Evolving
public class DescribeShareGroupsResult {
  
   * A list of the members of the share group.
   */
  public Collection<MemberDescription> members();

  /**
   * The *share Returngroup astate, futureor whichUNKNOWN yieldsif allthe ShareGroupDescriptionstate objects,cannot ifbe allparsed.
 the describes succeed.*/
  public ShareGroupState state();

  /**/
   * The publicgroup KafkaFuture<Map<String, ShareGroupDescription>> all() {coordinator, or null if the coordinator is not known.
   */
  public Node }coordinator();

    /**
   * The *authorized Returnoperations afor mapthis fromgroup, groupor idnull toif futuresthat whichinformation yieldis groupnot descriptionsknown.
     */
    public Map<String, KafkaFuture<ShareGroupDescription>> describedGroupsSet<AclOperation> authorizedOperations() {
    }
}

ShareGroupDescription

This class does indeed reuse the MemberDescription  class intended for consumer groups. It is sufficiently general to work for share groups also.

;
}

DescribeShareGroupsOptions

Code Block
language
Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.acl.AclOperation;

/**
 * AOptions detailedfor description of a single share group in the cluster{@link Admin#describeShareGroups(Collection<String>, DescribeShareGroupsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ShareGroupDescriptionDescribeShareGroupsOptions extends AbstractOptions<DescribeShareGroupsOptions> {
  public ShareGroupDescription(String groupId,public Collection<MemberDescription> members, ShareGroupState state, Node coordinator);
  public ShareGroupDescription(String groupId, Collection<MemberDescription> members, ShareGroupState state, Node coordinator, Set<AclOperation> authorizedOperations);

  /**
  DescribeShareGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations);

    public boolean includeAuthorizedOperations();
}

ListShareGroupOffsetsResult

The offset returned for each topic-partition is the share-partition start offset (SPSO).

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The idresult of the share group.
   */
  public String groupId();

  /**
   * A list of the members of the share group.
   */
  public Collection<MemberDescription> members();

{@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsResult {
    /**
     * The share group state, or UNKNOWN if the state cannot be parsed.
 Return a future which yields all Map<String, Map<TopicPartition, Long> objects, if requests for all the groups succeed.
     */
    public ShareGroupState state(); KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() {
    }

    /**
   * The share* groupReturn coordinator,a orfuture nullwhich ifyields thea coordinatormap isof nottopic known.
partitions to offsets */
for the public Node coordinator();
specified group.
  /**
   */
 The authorized operations forpublic this groupKafkaFuture<Map<TopicPartition, orLong>> null if that information is not known.
   */
  public Set<AclOperation> authorizedOperations();
}

...

partitionsToOffset(String groupId) {
    }
}

ListShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for {@link Admin#describeShareGroups(Collection<String>Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, DescribeShareGroupsOptionsListShareGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DescribeShareGroupsOptionsListShareGroupOffsetsOptions extends AbstractOptions<DescribeShareGroupsOptions>AbstractOptions<ListShareGroupOffsetsOptions> {
    public DescribeShareGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations);

    public boolean includeAuthorizedOperations();
}

ListShareGroupOffsetsResult

The offset returned for each topic-partition is the share-partition start offset (SPSO).

ListShareGroupOffsetsSpec

Code Block
languagejava
package org.apache.kafka.clientsclient.admin;
 
/**
 * TheSpecification resultof ofshare thegroup {@link Admin#listShareGroupOffsetsoffsets to list using {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsResultListShareGroupOffsetsSpec {
  public ListShareGroupOffsetsSpec();

  /**
     * ReturnSet athe futuretopic whichpartitions yieldswhose alloffsets Map<String,are Map<TopicPartition,to Long> objects, if requestsbe listed for alla theshare groups succeedgroup.
     */
  ListShareGroupOffsetsSpec  public KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() {
    }

    topicPartitions(Collection<TopicPartition> topicPartitions);

  /**
     * ReturnReturns athe futuretopic whichpartitions yieldswhose aoffsets mapare ofto topic partitions to offsetsbe listed for thea specifiedshare group.
     */
    public KafkaFuture<Map<TopicPartition, Long>> partitionsToOffset(String groupId) {
    }
}

...

Collection<TopicPartition> topicPartitions();
}

ListShareGroupsResult

Code Block
languagejava
package org.apache.kafka.clientclients.admin;
 
/**
 * The result Optionsof forthe {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptionsAdmin#listShareGroups(ListShareGroupsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsOptionsListShareGroupsResult extends{
 AbstractOptions<ListShareGroupOffsetsOptions> {
}

ListShareGroupOffsetsSpec

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 *  Specification of share* groupReturns offsetsa tofuture listthat usingyields {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsSpeceither an exception, or the full set of share group listings.
     */
    public KafkaFuture<Collection<ShareGroupListing>> all() {
  public ListShareGroupOffsetsSpec();  }

    /**
     * SetReturns thea topicfuture partitionswhich whoseyields offsetsjust arethe tovalid belistings.
 listed for a share group.
   */*/
    public KafkaFuture<Collection<ShareGroupListing>> valid() {
  ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions);

 }
 
    /**
     * Returns thea future topicwhich partitionsyields whosejust offsetsthe areerrors towhich beoccurred.
 listed for a share group.*/
   */
 public Collection<TopicPartition>KafkaFuture<Collection<Throwable>> topicPartitionserrors(); {
    }
}

...

ShareGroupListing

Code Block
languagejava
package org.apache.kafka.clientsclient.admin;

import org.apache.kafka.common.ShareGroupState;

/**
 * TheA resultlisting of the {@link Admin#listShareGroups(ListShareGroupsOptions)} calla share group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupsResultShareGroupListing {
  public  /**ShareGroupListing(String groupId);
  public ShareGroupListing(String groupId, Optional<ShareGroupState> *state);

 Returns a/**
 future that yields* eitherThe an exception, orid of the full set of share group listings.
     */
    public KafkaFuture<Collection<ShareGroupListing>>String allgroupId() {
    }

  ;

  /**
     * ReturnsThe ashare future which yields just the valid listingsgroup state.
     */
  public Optional<ShareGroupState> public KafkaFuture<Collection<ShareGroupListing>> valid() {
    }
 
    /**
     * Returns a future which yields just the errors which occurred.
     */
    public KafkaFuture<Collection<Throwable>> errors() {
    }
}

...

state();
}

ListShareGroupsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;

import org.apache.kafka.common.ShareGroupState;

/**
 * AOptions listingfor of a share group in the cluster{@link Admin#listShareGroups(ListShareGroupsOptions)}.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupsOptions extends ShareGroupListingAbstractOptions<ListShareGroupsOptions> {
  public ShareGroupListing(String groupId);
  public ShareGroupListing(String groupId, Optional<ShareGroupState> state);

  /**
   * The id of the share group.
 /**
     * If states is set, only groups in these states will be returned. Otherwise, all groups are returned.
     */
    public StringListShareGroupsOptions groupIdinStates(Set<ShareGroupState> states);

    /**
     * The share group state.
 Return the list of States that are requested or empty if no states have been specified.
     */
    public Optional<ShareGroupState>Set<ShareGroupState> statestates();
}

ListShareGroupsOptions

...

languagejava

...

GroupType

Another case is added to the org.apache.kafka.common.

...

GroupType  enum:

Enum constantDescription
SHARE("share") Share group

ShareGroupState

ListGroupsResult

...

languagejava

...

A new enum

org.apache.kafka.

...

common.ShareGroupState  is added:

Enum constant

DEAD 

EMPTY 

STABLE 

UNKNOWN 

Its definition follows the pattern of ConsumerGroupState with fewer states.

Exceptions

The following new exceptions are added to the org.apache.kafka.common.errors  package corresponding to the new error codes in the Kafka protocol.

  • InvalidRecordStateException  - The record state is invalid. The acknowledgement of delivery could not be completed.
  • ShareSessionNotFoundException  - The share session is not found.
  • InvalidShareSessionEpochException  - The share session epoch is invalid.
  • FencedStateEpochException  - The share coordinator rejected the request because the share-group state epoch did not match.

They are all subclasses of RetriableException .

Broker API

ConsumerGroupPartitionAssignor

The new org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor  interface is an interface implemented by server-side assignors for consumer groups. It signifies that the partition assignor is suitable for use with consumer groups.

Code Block
languagejava
package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
 * Server-side partition assignor for consumer groups used by the GroupCoordinator.
 *
 * The interface is kept in an internal module until KIP-848 is fully
 * implemented and ready to be released.
 */
@InterfaceStability.Unstable
public interface ConsumerGroupPartitionAssignor extends PartitionAssignor {
}

The two built-in partition assignors for consumer groups, org.apache.kafka.coordinator.group.assignor.RangeAssignor  and org.apache.kafka.coordinator.group.assignor.UniformAssignor , are both changed to implement this interface instead of org.apache.kafka.coordinator.group.assignor.PartitionAssignor  because they are intended only for use with consumer groups.

ShareGroupPartitionAssignor

The new org.apache.kafka.coordinator.group.assignor.ShareGroupPartitionAssignor  interface is an interface implemented by server-side assignors for share groups. It signifies that the partition assignor is suitable for use with share groups.

GroupListing

Code Block
languagejava
package org.apache.kafka.client.admin;
 
import org.apache.kafka.common.ShareGroupState;
 
/**
 * A listing of a group in the cluster.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class GroupListing {
  public GroupListing(String groupId, GroupType type);
 
  /**
   * The id of the group.
   */
  public String groupId();
 
  /**
   * The group type.
   */
  public GroupType type();
}

...

Code Block
languagejava
package org.apache.kafka.coordinator.clientgroup.adminassignor;

import org.apache.kafka.common.annotation.GroupTypeInterfaceStability;

/**
 * OptionsServer-side partition assignor for {@link Admin#listGroups(ListGroupsOptions)}share groups used by the GroupCoordinator.
 *
 * The APIinterface ofis thiskept classin isan evolving,internal seemodule {@link Admin} for details.until KIP-932 is fully
 */
@InterfaceStability.Evolving
public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {
    /**
     * If types is set, only groups of these types will be returned. Otherwise, all groups are returned.
     */
    public ListGroupsOptions types(Set<GroupType> types);

    /**
     * Return the list of types that are requested or empty if no types have been specified.
     */
    public Set<GroupType> types();
}

GroupType

Another case is added to the org.apache.kafka.common.GroupType  enum:

...

ShareGroupState

A new enum org.apache.kafka.common.ShareGroupState  is added:

...

DEAD 

...

EMPTY 

...

STABLE 

...

UNKNOWN 

Its definition follows the pattern of ConsumerGroupState with fewer states.

Exceptions

The following new exceptions are added to the org.apache.kafka.common.errors  package corresponding to the new error codes in the Kafka protocol.

  • InvalidRecordStateException  - The record state is invalid. The acknowledgement of delivery could not be completed.
  • ShareSessionNotFoundException  - The share session is not found.
  • InvalidShareSessionEpochException  - The share session epoch is invalid.

They are all subclasses of RetriableException .

Broker API

ShareGroupPartitionAssignor

The new org.apache.kafka.coordinator.group.assignor.ShareGroupPartitionAssignor  interface is an interface implemented by server-side assignors for share groups. It signifies that the partition assignor is suitable for use with share groups.

Code Block
languagejava
package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
 * Server side partition assignor used by the GroupCoordinator.
 *
 * The interface is kept in an internal module until KIP-932 is fully
 * implemented and ready to be released.
 */
@InterfaceStability.Unstable
public interface ShareGroupPartitionAssignor extends PartitionAssignor {
}

Command-line tools

kafka-share-groups.sh

A new tool called kafka-share-groups.sh is added for working with share groups. It has the following options:

...

Option

...

Description

...

--all-topics

...

Consider all topics assigned to a group in the `reset-offsets` process.

...

--bootstrap-server <String: server to connect to>

...

REQUIRED: The server(s) to connect to.

...

--command-config <String: command config property file>

...

Property file containing configs to be passed to Admin Client.

...

--delete

...

Pass in groups to delete topic partition offsets over the entire share group. For instance --group g1 --group g2

...

--delete-offsets

...

Delete offsets of share group. Supports one share group at the time, and multiple topics.

...

--describe

...

Describe share group and list offset lag (number of records not yet processed) related to given group.

...

--dry-run

...

Only show results without executing changes on share groups. Supported operations: reset-offsets.

...

--execute

...

Execute operation. Supported operations: reset-offsets.

...

--group <String: share group>

...

The share group we wish to act on.

...

--help

...

Print usage information.

...

--list

...

List all share groups.

...

--members

...

Describe members of the group. This option may be used with the '--describe' option only.

...

--offsets

...

Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only.

...

--reset-offsets

...

Reset offsets of share group. Supports one share group at a time, and instances must be inactive.

...

--state [String]

...

When specified with '--describe', includes the state of the group. When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states.

...

--timeout <Long: timeout (ms)>

...

The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000)   

...

--to-datetime <String: datetime>

...

Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'.

...

--to-earliest

...

Reset offsets to earliest offset.

...

--to-latest

...

Reset offsets to latest offset.

...

--topic <String: topic>

...

The topic whose share group information should be deleted or topic which should be included in the reset offset process.

...

--version

...

Display Kafka version.

 implemented and ready to be released.
 */
@InterfaceStability.Unstable
public interface ShareGroupPartitionAssignor extends PartitionAssignor {
}

One implementation of this interface, org.apache.kafka.coordinator.group.share.SimpleAssignor, is provided.

Command-line tools

kafka-share-groups.sh

A new tool called kafka-share-groups.sh is added for working with share groups. It has the following options:

Option

Description

--all-topics

Consider all topics assigned to a group in the `reset-offsets` process.

--bootstrap-server <String: server to connect to>

REQUIRED: The server(s) to connect to.

--command-config <String: command config property file>

Property file containing configs to be passed to Admin Client.

--delete

Delete share group.

--delete-offsets

Delete offsets of share group. Supports one share group at the time, and multiple topics.

--describe

Describe share group, members and offset information.

--dry-run

Only show results without executing changes on share groups. Supported operations: reset-offsets.

--execute

Execute operation. Supported operations: reset-offsets.

--group <String: share group>

The share group we wish to act on.

--help

Print usage information.

--list

List all share groups.

--members

Describe members of the group. This option may be used with the '--describe' option only.

--offsets

Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only.

--reset-offsets

Reset offsets of share group. Supports one share group at a time, and instances must be inactive. Has 2 execution options, '--dry-run' (the default) and '–execute'. Has 3 reset options: '--to-earliest', '--to-latest' and '--to-datetime'. 

--state [String]

When specified with '--describe', includes the state of the group. When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. The valid values are 'Empty', 'Stable' and 'Dead'.

--timeout <Long: timeout (ms)>

The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes. (default: 5000)   

--to-datetime <String: datetime>

Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'.

--to-earliest

Reset offsets to earliest offset.

--to-latest

Reset offsets to latest offset.

--topic <String: topic>

The topic whose share group information should be deleted or included in the reset offset process. When resetting offsets, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partitions to be included.

--version

Display Kafka version.

Here are some examples. 

To display a list of all share groups:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --list

To delete the information for topic T1  from inactive share group S1 , which essentially resets the consumption of this topic in the share group:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --group S1 --topic T1 --delete-offsets

To set the starting offset for consuming topic T1  in inactive share group S1  to a specific date and time:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --group S1 --topic T1 --reset-offsets --to-datetime 1999-12-31T23:57:00.000 --execute

kafka-console-share-consumer.sh

A new tool called kafka-console-share-consumer.sh  is added for reading data from Kafka topics using a share group and outputting to standard output. This is similar to kafka-console-consumer.sh  but using a share group and supporting the various acknowledge modes. It has the following options:

OptionDescription
--bootstrap-server <String: server to connect to>REQUIRED: The server(s) to connect to.
--consumer-config <String: config file>Consumer config properties file. Note that [consumer-property] takes precedence over this config.
--consumer-property <String: consumer_prop>

Consumer property in the form key=value.

--enable-systest-events

Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)

--formatter <String: class>

The name of a class to use for formatting Kafka messages for display. (default: kafka.tools.DefaultMessageFormatter)

--formatter-config <String: config file>

Config properties file to initialize the message formatter. Note that [property] takes precedence of this config.

--group <String: share groud id>

The share group id of the consumer. (default: "console-share-consumer" )

--help

Print usage information.

--key-deserializer <String: deserializer for keys>

The name of the class to use for deserializing keys.

--max-messages <Integer: num_messages>

The maximum number of messages to consume before exiting. If not set, consumption is continual.

--property <String: prop>

The properties to initialize the message formatter. Default properties include:

 print.timestamp=true|false

 print.key=true|false

 print.offset=true|false

 print.delivery=true|false

 print.partition=true|false

 print.headers=true|false

 print.

Here are some examples. 

To display a list of all share groups:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --list

To delete the information for topic T1  from inactive share group S1 , which essentially resets the consumption of this topic in the share group:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --group S1 --topic T1 --delete-offsets

To set the starting offset for consuming topic T1  in inactive share group S1  to a specific date and time:

$ kafka-share-groups.sh --bootstrap-server localhost:9092 --group S1 --topic T1 --reset-offsets --to-datetime 1999-12-31T23:57:00.000 --execute

kafka-console-share-consumer.sh

A new tool called kafka-console-share-consumer.sh  is added for reading data from Kafka topics using a share group and outputting to standard output. This is similar to kafka-console-consumer.sh  but using a share group and supporting the various acknowledge modes. It has the following options:

OptionDescription
--bootstrap-server <String: server to connect to>REQUIRED: The server(s) to connect to.
--consumer-config <String: config file>Consumer config properties file. Note that [consumer-property] takes precedence over this config.
--consumer-property <String: consumer_prop>

Consumer property in the form key=value.

--enable-systest-events

Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)

--formatter <String: class>

The name of a class to use for formatting Kafka messages for display. (default: kafka.tools.DefaultMessageFormatter)

--formatter-config <String: config file>

Config properties file to initialize the message formatter. Note that [property] takes precedence of this config.

--group <String: share groud id>

The share group id of the consumer. (default: "console-share-consumer" )

--help

Print usage information.

--key-deserializer <String: deserializer for keys>

The name of the class to use for deserializing keys.

--max-messages <Integer: num_messages>

The maximum number of messages to consume before exiting. If not set, consumption is continual.

--property <String: prop>

The properties to initialize the message formatter. Default properties include:

 print.timestamp=true|false

 print.key=true|false

 print.offset=true|false

 print.delivery=true|false

 print.partition=true|false

 print.headers=true|false

 print.value=true|false

 key.separator=<key.separator>

 line.separator=<line.separator>

 headers.separator=<line.separator>

 null.literal=<null.literal>

 key.deserializer=<key.deserializer>

 value.deserializer=<value.deserializer>

 header.deserializer=<header.deserializer>

Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers.

--reject

If specified, messages are rejected as they are consumed.

--reject-message-on-error

If there is an error when processing a message, reject it instead of halting.

--release

If specified, messages are released as they are consumed.

--timeout-ms <Integer: timeout_ms>

If specified, exit if no message is available for consumption for the specific interval.

--topic <String: topic>

REQUIRED: The topic to consume from.

--value-deserializer <String: deserializer for values>

The name of the class to use for deserializing values.

--version

Display Kafka version.

...

ConfigurationDescriptionValues
group.share.enableWhether to enable share groups on the broker.Default false  while the feature is being developed. This is an internal configuration.group.coordinator.rebalance.protocols The list of enabled rebalance protocols. (Existing configuration)

"share" 

will be added to the

is included in the list of protocols to enable share groups.

This will be added to the default value of this configuration property once this feature is complete.

group.share.delivery.count.limitThe maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
group.share.record.lock.duration.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.min.record.lock.duration.ms Share-group record acquisition lock minimum duration in milliseconds.Default 15000 (15 seconds), minimum 1000 (1 second), maximum 30000 (30 seconds)
group.share.max.record.lock.duration.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 30000 (30 seconds), maximum 3600000 (1 hour)
group.share.partition.max.record.locksShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000
group.share.session.timeout.ms 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds)
group.share.min.session.timeout.ms 

The minimum session timeout.

Default 45000 (45 seconds)
group.share.max.session.timeout.ms 

The maximum session timeout.

Default 60000 (60 seconds)
group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

Default 5000 (5 seconds)
group.share.min.heartbeat.interval.ms 

The minimum heartbeat interval.

Default 5000 (5 seconds)
group.share.max.heartbeat.interval.ms 

The maximum heartbeat interval.

Default 15000 (15 seconds)
group.share.max.groups 

The maximum number of share groups.

Default 10, minimum 1, maximum 100
group.share.max.size 

The maximum number of consumers that a single share group can accommodate.

Default 200, minimum 10, maximum 1000
group.share.assignors 

The server-side assignors as a list of full class names. The list must contain only a single entry which is used by all groups. In the future, it is envisaged that a group configuration will be provided to allow each group to choose one of the list of assignors.

A list of class names, currently limited to a single entry. Default "org.apache.kafka.coordinator.group.share.SimpleAssignor"
groupshare.sharecoordinator.state.topic.num.partitions 

The number of partitions for the share-group state topic (should not change after deployment).

Default 50
groupshare.sharecoordinator.state.topic.replication.factor 

The replication factor for the share-group state topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

Default 3 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
groupshare.sharecoordinator.state.topic.segment.bytes 

The log segment size for the share-group state topic.

Default 104857600
groupshare.sharecoordinator.state.topic.min.isr 

Overridden min.insync.replicas for the share-group state topic.

Default 2 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
share.coordinator.threads 

The number of threads used by the share coordinator.

Default 1, minimum 1

Group configuration

The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.

.state.topic.compression.codec 

Compression codec for the share-group state topic.

Default 0 (NONE)
share.coordinator.append.linger.ms 

The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk.

Default 10, minimum 0
share.coordinator.load.buffer.size 

Batch size for reading from the share-group state topic when loading state information into the cache (soft-limit, overridden if records are too large).

Default 5 * 1024 * 1024 (5242880), minimum 1
share.coordinator.snapshot.update.records.per.snapshot 

The number of update records the share coordinator writes between snapshot records.

Default 500, minimum 0
share.coordinator.threads 

The number of threads used by the share coordinator.

Default 1, minimum 1
share.coordinator.write.timeout.ms 

The duration in milliseconds that the share coordinator will wait for all replicas of the share-group state topic to receive a write.

Default 5000, minimum 1
share.fetch.purgatory.purge.interval.requests 

The purge interval (in number of requests) of the share fetch request purgatory

Default 1000

Group configuration

The following dynamic group configuration properties are added. These are properties for which it would be problematic to have consumers in the same share group using different behavior if the properties were specified in the consumer clients themselves.

ConfigurationDescriptionValues
group.share.isolation.level 

Controls how to read records written transactionally. If set to "read_committed", the share group will only deliver transactional records which have been committed. If set to "read_uncommitted", the share group will return all messages, even transactional messages which have been aborted. Non-transactional records will be returned unconditionally in either mode.

Valid values "read_committed"  and "read_uncommitted" (default)

group.share.auto.offset.reset 

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server:

  • "earliest" : automatically reset the offset to the earliest offset

  • "latest" : automatically reset the offset to the latest offset

Valid values "latest"  (default) and "earliest" 

group.share.record.lock.duration.ms 

Record acquisition lock duration in milliseconds

ConfigurationDescriptionValues
group.share.isolation.level 

Controls how to read records written transactionally. If set to "read_committed", the share group will only deliver transactional records which have been committed. If set to "read_uncommitted", the share group will return all messages, even transactional messages which have been aborted. Non-transactional records will be returned unconditionally in either mode.

Valid values "read_committed"  and "read_uncommitted" (default)

group.share.auto.offset.reset 

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server:

  • "earliest" : automatically reset the offset to the earliest offset

  • "latest" : automatically reset the offset to the latest offset

Valid values "latest"  (default) and "earliest" 

group.share.record.lock.duration.ms 

Record acquisition lock duration in milliseconds.

If not specified, uses the broker configuration group.share.record.lock.duration.ms.

If specified, minimum limited by the broker configuration group.share.min.record.lock.duration.ms , maximum limited by the broker configuration group.share.max.record.lock.duration.ms

group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

If not specified, uses the broker configuration group.share.record.heartbeatlock.intervalduration.ms.

If specified, minimum limited by the broker configuration group.share.min.record.heartbeatlock.intervalduration.ms , maximum limited by the broker configuration group.share.max.record.heartbeatlock.intervalduration.ms

group.share.sessionheartbeat.timeoutinterval.ms 

The timeout to detect client failures when using the share group protocolheartbeat interval given to the members.

If not specified, uses the broker configuration group.share.sessionheartbeat.timeoutinterval.ms .

If specified, minimum limited by the broker configuration group.share.min.sessionheartbeat.timeoutinterval.ms , maximum limited by the broker configuration group.share.max.sessionheartbeat.timeoutinterval.ms .

Consumer configuration

The existing consumer configurations apply for share groups with the following exceptions:

 

group.share.session.timeout.ms 

The timeout to detect client failures when using the share group protocol.

If not specified, uses the broker configuration group.share.session.timeout.ms .

If specified, minimum limited by the broker configuration group.share.min.session.timeout.ms , maximum limited by the broker configuration group.share.max.session.timeout.ms .

Consumer configuration

The existing consumer configurations apply for share groups with the following exceptions:

  • auto.offset.auto.offset.reset : this is handled by a dynamic group configuration group.share.auto.offset.reset 
  • enable.auto.commit  and auto.commit.interval.ms : share groups do not support auto-commit
  • group.instance.id : this concept is not supported by share groups
  • isolation.level : this is handled by a dynamic group configuration group.share.isolation.level 
  • partition.assignment.strategy : share groups do not support client-side partition assignors
  • interceptor.classes : interceptors are not supported
  • protocol.type : this configuration is used to select the group protocol used for KafkaConsumer
  • session.timeout.ms : this is deprecated in KIP-848 and is not supported for share groups
  • heartbeat.interval.ms : this is deprecated in KIP-848 and is not supported for share groups

...

  • ShareGroupHeartbeat - for consumers to form and maintain share groups
  • ShareGroupDescribe - for describing share groups
  • ShareFetch - for fetching records from share-partition leaders
  • ShareAcknowledge - for acknowledging delivery of records with share-partition leaders
  • AlterShareGroupOffsets - for altering the share-partition start offsets for the share-partitions in a share group
  • DeleteShareGroupOffsets - for deleting the offsets for the share-partitions in a share group
  • DescribeShareGroupOffsets - for describing the offsets for the share-partitions in a share group
  • InitializeShareGroupState  - for initializing share-partition state on a share-coordinator
  • ReadShareGroupState - for reading share-partition state from a share coordinator

  • WriteShareGroupState - for writing share-partition state to a share coordinator

  • DeleteShareGroupState - for deleting share-partition state from a share coordinator

  • ReadShareGroupOffsetsStateReadShareGroupStateSummary  - for reading the offsets from a summary of the share-partition state from a share coordinator

...

  • FindCoordinator  - for finding coordinators, to support share coordinatorsListGroups  - for listing groups, to support listing share groups

Access control

This table gives the ACLs required for the new APIs.

...

This KIP adds the following error codes the Kafka protocol.

  • INVALID_RECORD_STATE  (121) - The record state is invalid. The acknowledgement of delivery could not be completed.
  • SHARE_SESSION_NOT_FOUND  (122) - The share session is not found.
  • INVALID_SHARE_SESSION_EPOCH  (123) - The share session epoch is invalid.
  • FENCED_STATE_EPOCH  (124) - The share coordinator rejected the request because share-group state epoch did not match.

...

The KIP introduces version 56.

Request schema

Version 5 6 adds the new key type of FindCoordinatorRequest.CoordinatorType.SHARE with value 2 with the key of "group:topicId:partition".

Code Block
{
  "apiKey": 10,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "FindCoordinatorRequest",
  // Version 1 adds KeyType.
  //
  // Version 2 is the same as version 1.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds support for batching via CoordinatorKeys (KIP-699)
  //
  // Version 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
  //
  // Version 6 adds support for share groups (KIP-932).
  "validVersions": "0-56",
  "deprecatedVersions": "0",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "Key", "type": "string", "versions": "0-3",
      "about": "The coordinator key." },
    { "name": "KeyType", "type": "int8", "versions": "1+", "default": "0", "ignorable": false,
      "about": "The coordinator key type. (Group, transaction, share, etc.)" },
    { "name": "CoordinatorKeys", "type": "[]string", "versions": "4+",
      "about": "The coordinator keys." }
  ]
}

Response schema

Version 5 6 is the same as version 45.

ListGroups API

This KIP introduces version 6 

Request schema

ShareGroupHeartbeat API

The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.

Request schema

The member must set all the top-level fields when it joins for the first time or when an error occurs. Otherwise, it is expected only to fill in the fields which have changed since the last heartbeat.Version 6 adds support for share groups.

Code Block
{
  "apiKey": 1676,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListGroupsRequest",
  // Version 1 and 2 are the same as version 0.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the StatesFilter field (KIP-518).
  //
  // Version 5 adds the TypesFilter field (KIP-848).
  //
  // Version 6 adds support for share groups.
  ShareGroupHeartbeatRequest",
  "validVersions": "0-6",
  "flexibleVersions": "30+",
  "fields": [
    { "name": "StatesFilterGroupId", "type": "[]string", "versions": "4+0+", "entityType": "groupId",
      "about": "The states of the groups we want to list. If empty, all groups are returned with their state.group identifier." },
    { "name": "TypesFilterMemberId", "type": "[]string", "versions": "50+",
      "about": "The member ID typesgenerated ofby the groupscoordinator. weThe wantmember toID list.must Ifbe empty,kept allduring groupsthe areentire returnedlifetime withof theirthe typemember." },
  ]
}

Response schema

Version 6 is the same as version 5.

ShareGroupHeartbeat API

The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.

Request schema

The member must set all the top-level fields when it joins for the first time or when an error occurs. Otherwise, it is expected only to fill in the fields which have changed since the last heartbeat.

Code Block
{
  "apiKey": TBD,
     { "name": "MemberEpoch", "type": "requestint32",
  "listenersversions": ["broker0+"],
      "nameabout": "ShareGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
  The current member epoch; 0 to join the group; -1 to leave the group." },
    { "name": "GroupIdRackId", "type": "string", "versions": "0+",  "entityTypenullableVersions": "groupId0+",
 "default": "null",
      "about": "Thenull groupif identifier." },
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID generated by the coordinator. The member ID must be kept during the entire lifetime of the membernot provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." },
      { "name": "MemberEpochSubscribedTopicNames", "type": "int32", "versions": "0+",
        "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
      { "name": "RackId", "type": "[]string", "versions": "0+",  "nullableVersions": "0+", "default": "null",
        "about": "null if not provided or if it didn't change since the last heartbeat; the racksubscribed IDtopic of consumernames otherwise." },
      { "name": "SubscribedTopicNames",]
}

Response schema

The group coordinator will only send the Assignment field when it changes.

Code Block
{
  "apiKey": 76,
  "type": "[]stringresponse",
  "versionsname": "0+ShareGroupHeartbeatResponse",
  "nullableVersionsvalidVersions": "0+",
  "defaultflexibleVersions": "null0+",
  // Supported     "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }
  ]
}

Response schema

The group coordinator will only send the Assignment field when it changes.

Code Block
{
  "apiKey": TBD,
  "type": "response",
  "name": "ShareGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_MEMBER_ID (version 0+)
  // - GROUP_MAX_SIZE_REACHED (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The member ID generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
      "about": "The heartbeat interval in milliseconds." },
    { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not provided; the assignment otherwise.", "fields": [
        { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
          "about": "The partitions assigned to the member." }
    ]}
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "Partitions", "type": "[]int32", "versions": "0+",
          "about": "The partitions." }
    ]}
  ]
}

...

The ShareGroupDescribe API is used to describe share groups.

Request schema

Code Block
{
  "apiKey": NN77,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareGroupDescribeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",
      "about": "The ids of the groups to describe" },
    { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+",
      "about": "Whether to include authorized operations." }
  ]
}

Response schema

Code Block
{
  "apiKey": NN77,
  "type": "response",
  "name": "ShareGroupDescribeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - INVALID_GROUP_ID (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
      "about": "Each described group.",
      "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The describe error, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The top-level error message, or null if there was no error." },
        { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
          "about": "The group ID string." },
        { "name": "GroupState", "type": "string", "versions": "0+",
          "about": "The group state string, or the empty string." },
        { "name": "GroupEpoch", "type": "int32", "versions": "0+",
          "about": "The group epoch." },
        { "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
          "about": "The assignment epoch." },
        { "name": "AssignorName", "type": "string", "versions": "0+",
          "about": "The selected assignor." },
        { "name": "Members", "type": "[]Member", "versions": "0+",
          "about": "The members.",
          "fields": [
            { "name": "MemberId", "type": "string", "versions": "0+",
              "about": "The member ID." },
            { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The member rack ID." },
            { "name": "MemberEpoch", "type": "int32", "versions": "0+",
              "about": "The current member epoch." },
            { "name": "ClientId", "type": "string", "versions": "0+",
              "about": "The client ID." },
            { "name": "ClientHost", "type": "string", "versions": "0+",
              "about": "The client host." },
            { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName",
              "about": "The subscribed topic names." },
            { "name": "Assignment", "type": "Assignment", "versions": "0+",
              "about": "The current assignment." }
          ]},
        { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
          "about": "32-bit bitfield to represent authorized operations for this group." }
      ]
    }
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
    ]},
    { "name": "Assignment", "versions": "0+", "fields": [
      { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
        "about": "The assigned topic-partitions to the member." }
    ]}
  ]
}

...

  • If acknowledgements are being made for a partition and no records should be fetched, PartitionMaxBytes  should be set to zero.
  • If acknowledgements are being made for a partition which is being removed from the share session, the partition is included in the Topics array with PartitionMaxBytes  set to zero AND the partition is included in ForgottenTopicsData .
  • If acknowledgements are being made for a partition in the final request in a share session, the partition is included in the Topics  array and ShareSessionEpoch  is set to -1. No data will be fetched and it is not necessary to include the partition in ForgottenTopicsData .
  • If there's an error which affects all piggybacked acknowledgements but which does not prevent data from being fetched, the AcknowledgeErrorCode  in the response will be set to the same value for all partitions which had piggybacked acknowledgements.

Request schema

For the AcknowledgementBatches of each topic-partition, the FirstOffsets  must be ascending order and the ranges must be non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

Code Block
{
  "apiKey": NN78,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The member ID." },
    { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
      "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "FirstOffset", "type": "int64", "versions": "0+",
            "about": "First offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
            "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}
    ]},
    { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "ignorable": false,
      "about": "The partitions to remove from this share session.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions indexes to forget." }
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN78,
  "type": "response",
  "name": "ShareFetchResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors for ErrorCode and AcknowledgeErrorCode:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+) - only for AcknowledgeErrorCode
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - CORRUPT_MESSAGE (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top-level response error code." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
     { "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The fetch error code, or 0 if there was no fetch error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The fetch error message, or null if there was no fetch error." },
        { "name": "AcknowledgeErrorCode", "type": "int16", "versions": "0+",
          "about": "The acknowledge error code, or 0 if there was no acknowledge error." },
        { "name": "AcknowledgeErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The acknowledge error message, or null if there was no acknowledge error." }, 
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch." }
        ]},
        { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."},
        { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields":  [
          {"name": "FirstOffset", "type":  "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."},
          {"name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records."},
          {"name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count of this batch of acquired records."}
        ]}
      ]}
    ]},
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+",
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

...

The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.

Request schema

For the AcknowledgementBatches of each topic-partition, the FirstOffsets  must be ascending order and the ranges must be non-overlapping.

For each AcknowledgementBatch , the array of AcknowledgeType entries can consist of a single value which applies to all entries in the batch, or a separate value for each offset in the batch.

Code Block
{
  "apiKey": NN79,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareAcknowledgeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The member ID." },
    { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
      "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
      "about": "The topics containing records to acknowledge.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
        "about": "The partitions containing records to acknowledge.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "FirstOffset", "type": "int64", "versions": "0+",
            "about": "First offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
            "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN79,
  "type": "response",
  "name": "ShareAcknowledgeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top level response error code." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch." }
        ]}
      ]}
    ]},
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+",
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

...

The AlterShareGroupOffsets API is used to alter the share-partition start offsets for the share-partitions in a share group. The share-partition leader handles group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "AlterShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]AlterShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to alter offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]AlterShareGroupOffsetsRequestPartition", "versions": "0+",
        "about": "Each partition to alter offsets for.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "AlterShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]AlterShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]AlterShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

The DeleteShareGroupOffsets API is used to delete the offsets for the share-partitions in a share group. The share-partition leader handles group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]DeleteShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to delete offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DeleteShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "PartitionsErrorCode", "type": "[]DeleteShareGroupOffsetsResponsePartitionint16", "versions": "0+", "fields": [
        { "nameabout": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

DescribeShareGroupOffsets API

The DescribeShareGroupOffsets API is used to describe the offsets for the share-partitions in a share group. The share-partition leader handles group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DescribeShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to describe offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DescribeShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DescribeShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset."},
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

The InitializeShareGroupState API is used by the group coordinator to initialize the share-partition state. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "InitializeShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]InitializeStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch forof thisthe share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, or -1 if the start offset is not being initialized." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "InitializeShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]InitializeStateResult", "versions": "0+",
      "about": "The initialization results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

The ReadShareGroupState API is used by share-partition leaders to read share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]ReadStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about", "The leader epoch of the share-partition." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadStateResult", "versions": "0+",
      "about": "The read results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch forof thisthe share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, which can be -1 if it is not yet initialized." },
        { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields":[
          { "name": "FirstOffset", "type": "int64", "versions": "0+",
            "about": "The first offset of this state batch." },
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of this state batch." },
          { "name": "DeliveryState", "type": "int8", "versions": "0+",
            "about": "The delivery state - 0:Available,2:Acked,4:Archived." },
          { "name": "DeliveryCount", "type": "int16", "versions": "0+",
            "about": "The delivery count." }
        ]}
      ]}
    ]}
  ]
}

...

The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "WriteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]WriteStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch forof thisthe share-partition." },
        { "name": "StartOffsetLeaderEpoch", "type": "int64int32", "versions": "0+",
          "about": "The leader epoch of the share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, or -1 if the start offset is not being written." },
        { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields": [
          { "name": "FirstOffset", "type": "int64", "versions": "0+",
            "about": "The first offset of this state batch." },
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of this state batch." },
          { "name": "DeliveryState", "type": "int8", "versions": "0+",
            "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
          { "name": "DeliveryCount", "type": "int16", "versions": "0+",
            "about": "The delivery count." }
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "WriteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]WriteStateResult", "versions": "0+",
      "about": "The write results",LEADER_EPOCH (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]WriteStateResult", "versions": "0+",
      "about": "The write results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about": "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

DeleteShareGroupState API

The DeleteShareGroupState API is used by the group coordinator to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]DeleteStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]DeleteStateResult", "versions": "0+",
      "about": "The delete results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about": "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

ReadShareGroupStateSummary API

The ReadShareGroupStateSummary API is used by the group coordinator to read a summary of the share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupStateSummaryRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]ReadStateSummaryData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The leader epoch of the share-partition." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupStateSummaryResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadStateSummaryResult", "versions": "0+",
      "about": "The read results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about": "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch of the share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset." }
      ]}
    ]}
  ]
}

Records

This section describes the new record types.

Group and assignment metadata

In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets  topic.

With the exception of the ShareGroupStatePartitionMetadata record, all of the record types are analogous to those introduced in KIP-848 for consumer groups.

For each share group, a ShareGroupMetadata record is written for the group epoch, and a ShareGroupPartitionMetadata record is written for the topic-partitions being assigned by the group. When the group is deleted, a tombstone records are written.

For each member, there is a ShareGroupMemberMetadata record which enables group membership to continue seamlessly across a group coordinator change. When the member leaves, a tombstone record is written.

For each share group, a ShareGroupTargetAssignmentMetadata record is written to record the group epoch used to compute the assignment. For each member, there is a ShareGroupTargetAssignmentMember record which persists the target assignment, and a ShareGroupCurrentMemberAssignment record which persists the current assignment and is also used to keep track of the member epoch.

There is also a ShareGroupStatePartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.

ShareGroupMetadataKey

Code Block
{
  "type": "data",
  "name": "ShareGroupMetadataKey",
  "validVersions": "11",
  "flexibleVersions": "none",
  "fields": [
      { "name": "TopicIdGroupId", "type": "uuidstring", "versions": "0+11",
        "about": "The topicgroup identifierid." },
      { "name]
}

ShareGroupMetadataValue

Code Block
{
  "type": "Partitionsdata",
  "typename": "[]PartitionResultShareGroupMetadataValue",
  "versionsvalidVersions": "0+",
        "about" "flexibleVersions": "The results for the partitions.0+",
  "fields": [
        { "name": "PartitionEpoch", "type": "int32", "versions": "0+",
          "about": "The partitiongroup indexepoch." },
  ]
}

ShareGroupPartitionMetadataKey

Code Block
{
       { "name": "ErrorCode", "type": "int16data",
  "versionsname": "0+ShareGroupPartitionMetadataKey",
          "about"validVersions": "The error code, or 0 if there was no error." },
        { "name9",
  "flexibleVersions": "ErrorMessagenone",
  "typefields": "string",[
    { "versionsname": "0+GroupId", "nullableVersionstype": "0+string", "defaultversions": "null9",
          "about": "The error message, or null if there was no error." }
      ]}
    ]group id." }
  ]
}

DeleteShareGroupState API

The DeleteShareGroupState API is used by the group coordinator to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

ShareGroupPartitionMetadataValue

Code Block
{
  "apiKeytype": NN"data",
  "typename": "requestShareGroupPartitionMetadataValue",
  "listenersvalidVersions": ["broker0"],
  "nameflexibleVersions": "DeleteShareGroupStateRequest0+",
  "fields": [
    { "validVersionsname": "0Topics",
  "flexibleVersionsversions": "0+", "type": "[]TopicMetadata",
      "about": "The list of topic metadata.", "fields": [
      { "name": "GroupIdTopicId", "typeversions": "string0+", "versionstype": "0+uuid",
        "about": "The grouptopic identifierid." },
      { "name": "TopicsTopicName", "typeversions": "[]DeleteStateData0+", "versionstype": "0+string",
        "about": "The data for the topicstopic name.", "fields": [ },
      { "name": "TopicIdNumPartitions", "typeversions": "uuid0+", "versionstype": "0+int32",
        "about": "The topic identifier": "The number of partitions of the topic." },
      { "name": "PartitionsPartitionMetadata", "typeversions": "[]PartitionData0+", "versionstype": "0+[]PartitionMetadata",
        "about": "Partitions mapped to  "The data fora set of racks. If the rack information is unavailable for all the partitions., an empty list is stored", "fields": [
        { "name": "Partition", "typeversions": "int320+", "versionstype": "0+int32",
          "about": "The partition indexnumber." },
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupStateResponseRacks",
  "validVersionsversions": "0+",
  "flexibleVersionstype": "0+[]string",
  //  - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)"about": "The set of racks that the partition is mapped to." }
      ]}
    ]}
  ]
}

ShareGroupMemberMetadataKey

Code Block
{
  "type": "data",
  "name": "ShareGroupMemberMetadataKey",
  "validVersions": "10",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ResultsGroupId", "type": "[]DeleteStateResultstring", "versions": "0+10",
      "about": "The deletegroup resultsid.", "fields": [ },
      { "name": "TopicIdMemberId", "type": "uuidstring", "versions": "0+10",
        "about": "The topicmember identifierid." },
  ]
}

ShareGroupMemberMetadataValue

Code Block
{
  "type    { "name": "Partitionsdata",
  "typename": "[]PartitionResultShareGroupMemberMetadataValue",
  "versionsvalidVersions": "0+",
        "aboutflexibleVersions" : "The results for the partitions.0+",
  "fields": [
        { "name": "PartitionRackId", "typeversions": "int320+", "versionsnullableVersions": "0+",
    "type": "string",
      "about": "The partition index(optional) rack id." },
        { "name": "ErrorCodeClientId", "typeversions": "int160+", "versionstype": "0+string",
          "about": "The error code, or 0 if there was no errorclient id." },
    { "name": "ClientHost",  {"versions": "0+", "nametype": "ErrorMessagestring",
      "typeabout": "string", "versionsThe client host." },
    { "name": "0+SubscribedTopicNames", "nullableVersionsversions": "0+", "defaulttype": "null[]string",
          "about": "The errorlist message,of orsubscribed null if there was no errortopic names." }
      ]}
    ]}
  ]
}

ReadShareGroupOffsetsState API

The ReadShareGroupOffsetsState API is used by the group coordinator to read the offset information from share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

...

ShareGroupTargetAssignmentMetadataKey

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"]data",
  "name": "ReadShareGroupOffsetsStateRequestShareGroupTargetAssignmentMetadataKey",
  "validVersions": "012",
  "flexibleVersions": "0+none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+12",
      "about": "The group identifierid." },
  ]
}

ShareGroupTargerAssignmentMetadataValue

Code Block
{
   {"type": "data",
  "name": "TopicsShareGroupTargetAssignmentMetadataValue",
  "typevalidVersions": "[]ReadOffsetsStateData0",
  "versionsflexibleVersions": "0+",
  "fields": [
    { "aboutname": "AssignmentEpoch"The data for the topics., "versions": "0+", "fieldstype": ["int32",
      { "nameabout": "The assignment epoch." }
  ]
}

ShareGroupTargetAssignmentMemberKey

Code Block
{
 TopicId", "type": "uuiddata",
  "versionsname": "0+ShareGroupTargetAssignmentMemberKey",
      "validVersions": "13",
  "aboutflexibleVersions": "The topic identifier." }none",
  "fields": [
    { "name": "PartitionsGroupId", "type": "[]PartitionDatastring", "versions": "0+13",
        "about":  "The data for the partitionsgroup id.", "fields": [
     },
    { "name": "PartitionMemberId", "type": "int32string", "versions": "0+13",
          "about": "The partitionmember indexid." }
      ]}
    ]}
  ]
}

...

ShareGroupTargetAssignmentMemberValue

Code Block
{
  "apiKey": NN,
  "type": "responsedata",
  "name": "ReadShareGroupOffsetsStateResponseShareGroupTargetAssignmentMemberValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "ResultsTopicPartitions", "typeversions": "[]ReadOffsetsStateResult0+", "versionstype": "0+[]TopicPartition",
      "about": "The readassigned resultspartitions.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "abouttype": "The topic identifier "uuid" },
      { "name": "Partitions", "typeversions": "[]PartitionResult0+", "versionstype": "0+",[]int32" }
    ]}
  ]
}

ShareGroupCurrentMemberAssignmentKey

Code Block
{
  "abouttype" : "The results for the partitions.", "fields": [
        {data",
  "name": "PartitionShareGroupCurrentMemberAssignmentKey",
  "typevalidVersions": "int3214",
  "versionsflexibleVersions": "0+none",
          "aboutfields": "The partition index." },
    [
    { "name": "ErrorCodeGroupId", "type": "int16string", "versions": "0+14",
          "about": "The error code, or 0 if there was no errorgroup id." },
        { "name": "ErrorMessageMemberId", "type": "string", "versions": "0+14", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no errormember id." }
  ]
}

ShareGroupCurrentMemberAssignmentValue

Code Block
{
  "type      { "name": "StateEpochdata",
  "typename": "int32ShareGroupCurrentMemberAssignmentValue",
  "versionsvalidVersions": "0+",
          "aboutflexibleVersions": "The state epoch for this share-partition." }0+",
    "fields": [
    { "name": "StartOffsetMemberEpoch", "typeversions": "int640+", "versionstype": "0+int32",
          "about": "The share-partitioncurrent startmember offset." }
      ]}
    ]}
  ]
}

Records

This section describes the new record types.

Group metadata

In order to coexist properly with consumer groups, the group metadata records for share groups are persisted by the group coordinator to the compacted __consumer_offsets  topic.

For each share group, a single ShareGroupMetadata record is written. When the group is deleted, a tombstone record is written.

For each member, there is a ShareGroupMemberMetadata record which enables group membership to continue seamlessly across a group coordinator change. When the member leaves, a tombstone record is written.

There is also a ShareGroupPartitionMetadata record which contains a list of all share-partitions which have been assigned in the share group. It is used to keep track of which share-partitions have persistent state.

ShareGroupMetadataKey

Code Block
{
  "type": "data",
  "name": "ShareGroupMetadataKey",
  "validVersions": "11",
  "flexibleVersions": "none",
  "fields": [epoch that is expected from the member in the heartbeat request." },
    { "name": "PreviousMemberEpoch", "versions": "0+", "type": "int32",
      "about": "If the last epoch bump is lost before reaching the member, the member will retry with the previous epoch." },
    { "name": "GroupIdState", "typeversions": "string0+", "versionstype": "3int8",
      "about": "The groupmember idstate." See }
MemberState  ]
}

ShareGroupMetadataValue

Code Block
{
  "type": "data"for the possible values." },
  "name": "ShareGroupMetadataValue",
 { "validVersionsname": "0AssignedPartitions",
  "flexibleVersionsversions": "0+",
  "fieldstype": "[]TopicPartitions",
    { "name": "Epochabout", "type": "int32", "versions": "0+",
      "about": "The group epochThe partitions assigned to (or owned by) this member." }
  ]
}

ShareGroupMemberMetadataKey

Code Block
{,
  "typecommonStructs": "data",[
  "name": "ShareGroupMemberMetadataKey",
 { "validVersionsname": "10TopicPartitions",
  "flexibleVersionsversions": "none0+",
  "fields": [
      { "name": "GroupIdTopicId", "type": "stringuuid", "versions": "100+",
        "about": "The grouptopic idId." },
      { "name": "MemberIdPartitions", "type": "string[]int32", "versions": "100+",
        "about": "The memberpartition idIds." }
    ]}
  ]
}

...

ShareGroupStatePartitionMetadataKey

Code Block
{
  "type": "data",
  "name": "ShareGroupMemberMetadataValueShareGroupStatePartitionMetadataKey",
  "validVersions": "015",
  "flexibleVersions": "0+none",
  "fields": [
    { "name": "RackIdGroupId", "versionstype": "0+string", "nullableVersionsversions": "0+15", "type": "string",
      "about": "The (optional) rackgroup id." }
  },]
}

ShareGroupStatePartitionMetadataValue

Code Block
{
  "type": "data",
 { "name": "ClientIdShareGroupPartitionMetadataValue",
  "versionsvalidVersions": "0+",
  "typeflexibleVersions": "string0+",
      "aboutfields": "The client id." },[
    { "name": "ClientHostInitializedTopics", "versions": "0+", "type": "string[]TopicPartitionsInfo",
      "about": "The client host topics with initialized share-group state." },
    { "name": "SubscribedTopicNamesDeletingTopics", "versions": "0+", "type": "[]stringTopicInfo",
      "about": "The list of subscribed topic names topics whose share-group state is being deleted." }
  ]
}

ShareGroupPartitionMetadataKey

Code Block
{
,
  "typecommonStructs": "data",[
  "name": "ShareGroupPartitionMetadataKey",
 { "validVersionsname": "9TopicPartitionsInfo",
  "flexibleVersionsversions": "none0+",
  "fields": [
      { "name": "GroupIdTopicId", "type": "stringuuid", "versions": "90+",
        "about": "The grouptopic ididentifier." },
  ]
}

ShareGroupPartitionMetadataValue

Code Block
{
  "type": "data",
 { "name": "ShareGroupPartitionMetadataValueTopicName",
  "validVersionstype": "0string",
  "flexibleVersionsversions": "0+",
  "fields": [
    { "nameabout": "InitializedTopics", "versionsThe topic name." },
      { "name": "0+Partitions", "type": "[]TopicMetadataint32",
      "aboutversions": "The topics with initialized share-group state." }
  ]0+",
  "commonStructs": [
    { "nameabout": "TopicMetadata", "versions": "0+", "fields": [The partitions." }
    ]},
  { "name": "TopicId", { "typename": "uuidTopicInfo", "versions": "0+",
        "aboutfields": "The topic identifier." },[
      { "name": "TopicNameTopicId", "type": "stringuuid", "versions": "0+",
        "about": "The topic nameidentifier." },
      { "name": "NumPartitionsTopicName", "type": "int32string", "versions": "0+",
        "about": "The numbertopic of partitionsname." }
      ]}
  ]
}

Share-group state

These records are written by the share coordinator on the __share_group_state  topic.

...

Code Block
{
  "type": "data",
  "name": "ShareSnapshotKey",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0",
      "about": "The group id." },
    { "name": "TopicId", "type": "uuid", "versions": "0",
      "about": "The topic id." },
    { "name": "Partition", "type": "int32", "versions": "0",
      "about": "The partition index." }
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareSnapshotValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SnapshotEpoch", "type": "uint16", "versions": "0",
      "about": "The snapshot epoch." },
    { "name": "StateEpoch", "type": "int32", "versions": "0+",
      "about": "The state epoch of the share-partition." epoch for this},
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
      "about": "The leader epoch of the share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [
      { "name": "FirstOffset", "type": "int64", "versions": "0",
        "about": "The first offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "DeliveryState", "type": "int8", "versions": "0",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]} 
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareUpdateKey",
  "validVersions": "1",
  "flexibleVersions": "none",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "01",
      "about": "The group id." },
    { "name": "TopicId", "type": "uuid", "versions": "01",
      "about": "The topic id." },
    { "name": "Partition", "type": "int32", "versions": "01",
      "about": "The partition index." }
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareUpdateValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SnapshotEpoch", "type": "uint16", "versions": "0",
      "about": "The snapshot epoch." },
    { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
      "about": "The leader epoch of the share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset, or -1 if the start offset is not being updated." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [
      { "name": "FirstOffset", "type": "int64", "versions": "0",
        "about": "The first offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "DeliveryState", "type": "int8", "versions": "0",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]} 
  ]
}

Metrics

Broker

...

metrics

The following new broker metrics should be added:

Metric Name

Type

Group

Tags

Description

JMX Bean

group-count

Gauge

group-coordinator-metrics

protocol: share

The total number of share groups managed by group coordinator.

kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share 

rebalance (rebalance-rate and rebalance-count)

Meter

group-coordinator-metrics

protocol: share

The total number of share group rebalances count and rate.

kafka.server:type=group-coordinator-metrics,name=rebalance-rate,protocol=share 

kafka.server:type=group-coordinator-metrics,name=rebalance-count,protocol=share 

num-partitions

Gauge

group
-coordinator
-
metrics

protocol: share

The number of share partitions managed by group coordinator. 

kafka.server:type=group-coordinator-metrics,name=num-partitions,protocol=share 

group-
countGaugegroup-coordinator-metrics

protocol: share

state: {

empty

Empty|

stable

Stable|

dead

Dead} 

The number of share groups in respective state.kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={
empty
Empty|
stable
Stable|
dead
Dead} 

share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count)

Meter

share-group
-coordinator
-metrics

protocol: share


The

total

number of

offsets acknowledged for share groups

acknowledgement requests.

kafka.server:type=share-group

-coordinator

-metrics,name=share-acknowledgement-rate

,protocol=share

kafka.server:type=share-group

-coordinator

-metrics,name=share-acknowledgement-count

,protocol=share

 

record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count)

Meter

share-group
-coordinator
-metrics
protocol: share

ack-type:{

accept

Accept,

release

Release,

reject

Reject} 


The number of records acknowledged per acknowledgement type.

kafka.server:type=share-group

-coordinator

-metrics,name=record-acknowledgement-rate

,protocol=share

,ack-type={

accept

Accept,

release

Release,

reject

Reject} 

kafka.server:type=share-group-

coordinator-

metrics,name=record-acknowledgement-count

,protocol=share

,ack-type={

accept

Accept,

release

Release,

reject

Reject} 

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-group
-coordinator
-metrics
protocol: share 


The time taken to load the share partitions.

kafka.server:type=share-group

-coordinator

-metrics,name=partition-load-time-avg

,protocol=share 

kafka.server:type=share-group

-coordinator

-metrics,name=partition-load-time-max

,protocol=share

  

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-coordinator-metrics


The time taken in milliseconds to load the share-group state from the share-group state partitions.

kafka.server:type=share-coordinator-metrics,name=partition-load-time-avg 

kafka.server:type=share-coordinator-metrcs,name=partition-load-time-max 

thread-idle-ratio (thread-idle-ratio-min and thread-idle-ratio-avg)

Meter

share-coordinator-metrics


The fraction of time the share coordinator thread is idle.

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-min 

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-avg 

write (write-rate and write-total)

Meter

share-coordinator-metrics


The number of share-group state write calls per second.

kafka.server:type=share-coordinator-metrics,name=write-rate 

kafka.server:type=share-coordinator-metrics,name=write-total 

write-latency (write-latency-avg and write-latency-

total

max)

Meter

share-coordinator-metrics


The time taken for a share-group state write call, including the time to write to the share-group state topic.

kafka.server:type=share-coordinator-metrics,name=write-latency-avg 

kafka.server:type=share-coordinator-metrics,name=write-latency-max 

num-partitions

Gauge

share-coordinator-metrics


The number of partitions in the share-state topic.

kafka.server:type=share-coordinator-metrics,name=num-partitions 

The group coordinator uses metrics in the group group-coordinator-metrics . The share-partition leader uses metrics in the group share-group-metrics . The share coordinator uses metrics in the group share-coordinator-metrics .

Client metrics

The following new client metrics should be added:. Following the terminology in KIP-714, these are standard metrics, as opposed to required metrics.

Metric Name

Type

Group

Tags

Description

JMX Bean

Telemetry metric name (KIP-714)

last-poll-seconds-ago

Gauge

consumer-share

-consumer

-metrics

client-id 

The number of seconds since the last poll() invocation.

kafka.consumer:type=consumer-share

-consumer

-metrics,name=last-poll-seconds-ago,client-id=([-.\w]+

)

) 

org.apache.kafka.consumer.share.last.poll.seconds.ago 

time-between-poll-avg

Meter

consumer-share-

consumer-

metrics

client-id 

The average delay between invocations of poll() in milliseconds.

kafka.consumer:type=consumer-share

-consumer

-metrics,name=time-between-poll-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.time.between.poll.avg 

time-between-poll-max

Meter

consumer-share

-consumer

-metrics

client-id 

The

max

maximum delay between invocations of poll() in milliseconds.

kafka.consumer:type=consumer-share

-consumer

-metrics,name=last-poll-seconds-ago,client-id=([-.\w]+) 

o.a.k.consumer.share.time.between.poll.max 

poll-idle-ratio-avg

Meter

consumer-share

-consumer

-metrics

client-id 

The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records.

kafka.consumer:type=consumer-share

-consumer

-metrics,name=poll-idle-ratio-avg

,client-id=([-.\w]+) 

heartbeat-response-time-max

Meter

share-consumer-coordinator-metrics

client-id 

The max time taken to receive a response to a heartbeat request

kafka.consumer:type=share-consumer-coordinator-metrics,name=heartbeat-response-time-max,client-id=([-.\w]+) 

heartbeat-rate

Meter

share-consumer-coordinator-metrics

client-id 

The number of heartbeats per second

kafka.consumer:type=share-consumer-coordinator-metrics,name=heartbeat-rate,client-id=([-.\w]+) 

heartbeat-total

Meter

share-consumer-coordinator-metrics

client-id 

The total number of heartbeats

kafka.consumer:type=share-consumer-coordinator-metrics,name=heartbeat-total

,client-id=([-.\w]+) 

last-heartbeat-seconds-ago

Gauge

share-consumer-

o.a.k.consumer.share.poll.idle.ratio.avg 

heartbeat-response-time-max

Meter

consumer-share-coordinator-metrics

client-id 

The

number of seconds since the last coordinator heartbeat was sent

maximum time taken to receive a response to a heartbeat request in milliseconds.

kafka.consumer:type=consumer-share

-consumer

-coordinator-metrics,name=

last

heartbeat-

heartbeat

response-

seconds

time-

ago

max,client-id=([-.\w]+) 

rebalance-latency-avg

o.a.k.consumer.share.coordinator.heartbeat.response.time.max 

heartbeat-rate

Meter

consumer-share-

consumer-

coordinator-metrics

client-id 

The

average time taken for a group to complete a rebalance

number of heartbeats per second.

kafka.consumer:type=consumer-share

-consumer

-coordinator-metrics,name=

rebalance

heartbeat-

latency-avg

rate,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.heartbeat.rate 

rebalance

heartbeat-

latency-max

total

Meter

consumer-share

-consumer

-coordinator-metrics

client-id 

The

max time taken for a group to complete a rebalance

total number of heartbeats.

kafka.consumer:type=consumer-share

-consumer

-coordinator-metrics,name=

rebalance

heartbeat-

latency-max

total,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.heartbeat.total 

rebalance

last-heartbeat-

latency

seconds-

total

ago

Meter

Gauge

consumer-share

-consumer

-coordinator-metrics

client-id 

The

total

number of

milliseconds spent in rebalances

seconds since the last coordinator heartbeat was sent.

kafka.consumer:type=consumer-share

-consumer

-coordinator-metrics,name=

rebalance

last-heartbeat-

latency

seconds-

total

ago,client

-id=([-.\w]+)

-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.last.heartbeat.seconds.ago 

rebalance-total

Meter

consumer-share-

consumer-

coordinator-metrics

client-id 

The total number of rebalance events.

kafka.consumer:type=consumer-share

-consumer

-coordinator-metrics,name=rebalance-total,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.rebalance.total 

rebalance-rate-per-hour

Meter

consumer-share

-consumer

-coordinator-metrics

client-id 

The number of rebalance events per hour.

kafka.consumer:type=consumer-share

-consumer

-coordinator-metrics,name=rebalance-rate-per-hour,client-id=([-.\w]+) 

o.a.k.consumer.share.coordinator.rebalance.rate.per.hour 

fetch-size-avg

Meter

consumer-share-

consumer-

fetch-manager-metrics

client-id 

The average number of bytes fetched per request.

kafka.consumer:type=consumer-share

-consumer

-fetch-manager-metrics,name=fetch-size-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.size.avg 

fetch-size-max

Meter

consumer-share

-consumer

-fetch-manager-metrics

client-id 

The maximum number of bytes fetched per request.

kafka.consumer:type=consumer-share

-consumer

-fetch-manager-metrics,name=fetch-size-max,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.size.max 

bytes-fetched-rate

Meter

consumer-share-

consumer-

fetch-manager-metrics

client-id 

The average number of bytes fetched per second.

kafka.consumer:type=consumer-share-

consumer-

fetch-manager-metrics,name=bytes-fetched-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.bytes.fetched.rate 

bytes-fetched-total

Meter

consumer-share-fetch-manager-metrics

client-id 

The total number of bytes fetched.

kafka.consumer:type=consumer-share-fetch-manager-metrics,name=bytes-fetched-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.bytes.fetched.total 

records-per-request-avg

Meter

consumer-share-fetch-manager-metrics

client-id 

The

total number of bytes fetched

average number of records in each request.

kafka.consumer:type=consumer-share

-consumer

-fetch-manager-metrics,name=

bytes

records-per-

fetched

request-

total

avg,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.records.per.request.avg 

records-per-request-

avg

max

Meter

consumer-share

-consumer

-fetch-manager-metrics

client-id 

The

average

maximum number of records in

each

a request.

kafka.consumer:type=consumer-share

-consumer

-fetch-manager-metrics,name=records-per-request-

avg

max,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.records.per.request.max 

records-fetched-rate

Meter

consumer-share-

consumer-

fetch-manager-metrics

client-id 

The average number of records fetched per second.

kafka.consumer:type=consumer-share

-consumer

-fetch-manager-metrics,name=records-fetched-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.records.fetched.rate 

records-fetched-total

Meter

consumer-share

-consumer

-fetch-manager-metrics

client-id 

The total number of records fetched.

kafka.consumer:type=consumer-share

-consumer

-fetch-manager-metrics,name=records-fetched-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.records.fetched.total 

acknowledgements-send-rate

Meter

consumer-share

-consumer

-fetch-manager-metrics

client-id 

The average number of record acknowledgements sent per second.

kafka.consumer:type=consumer-share-

consumer-

fetch-manager-metrics,name=acknowledgements-send-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.acknowledgements.send.rate 

acknowledgements-send-total

Meter

consumer-share

-consumer

-fetch-manager-metrics

client-id 

The total number of record acknowledgements sent.

kafka.consumer:type=consumer-share

-consumer

-fetch-manager-metrics,name=acknowledgements-send-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.acknowledgements.send.total 

acknowledgements-error-rate

Meter

consumer-share-

consumer-

fetch-manager-metrics

client-id 

The average number of record acknowledgements that resulted in errors per second.

kafka.consumer:type=consumer-share-

consumer-

fetch-manager-metrics,name=acknowledgements-error-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.acknowledgements.error.rate 

acknowledgements-error-total

Meter

consumer-share

-consumer

-fetch-manager-metrics

client-id 

The total number of record acknowledgements that resulted in errors.

kafka.consumer:type=consumer-share

-consumer

-fetch-manager-metrics,name=acknowledgements-error-total,client-id=

([-.\w]+)

([-.\w]+) 

o.a.k.consumer.share.fetch.manager.acknowledgements.error.total 

fetch-latency-avg

Meter

consumer-share

-consumer

-fetch-manager-metrics

client-id 

The average time taken for a fetch request.

kafka.consumer:type=consumer-share

-consumer

-fetch-manager-metrics,name=fetch-latency-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.latency.avg 

fetch-latency-max

Meter

consumer-share

-consumer

-fetch-manager-metrics

client-id 

The

max

maximum time taken for any fetch request.

kafka.consumer:type=consumer-share

-consumer

-fetch-manager-metrics,name=fetch-latency-max,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.latency.max 

fetch-rate

Meter

consumer-share-

consumer-

fetch-manager-metrics

client-id 

The number of fetch requests per second.

kafka.consumer:type=consumer-share

-consumer

-fetch-manager-metrics,name=fetch-rate,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.rate 

fetch-total

Meter

consumer-share

-consumer

-fetch-manager-metrics

client-id 

The total number of fetch requests.

kafka.consumer:type=consumer-share

-consumer

-fetch-manager-metrics,name=fetch-total,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.total 

fetch-throttle-time-avg

Meter

consumer-share

-consumer

-fetch-manager-metrics

client-id 

The average throttle time in

ms

milliseconds.

kafka.consumer:type=consumer-share-

consumer-

fetch-manager-metrics,name=fetch-throttle-time-avg,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.throttle.time.avg 

fetch-throttle-time-max

Meter

consumer-share-

consumer-

fetch-manager-metrics

client-id 

The maximum throttle time in

ms

milliseconds.

kafka.consumer:type=consumer-share

-consumer

-fetch-manager-metrics,name=fetch-throttle-time-max,client-id=([-.\w]+) 

o.a.k.consumer.share.fetch.manager.fetch.throttle.time.max 

Future Work

There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.

...

At these stages, KIP-932 can be used for familiarization and experimentation, but not production use. It is disabled in the default configuration for the cluster, and must be explicitly enabled. Doing so is not appropriate in a production cluster.

A temporary configuration group.share.enable is used to To turn on the feature, add "share"  to the group.coordinator.rebalance.protocols  configuration. There is no support for upgrade or downgrade.

...

To upgrade a cluster, it is first necessary to perform a rolling upgrade of the cluster to a software version which supports share groups. Then, the new protocol is enabled using the kafka-feature.sh  tool by setting a group.version  which supports it using the kafka-feature.sh  toolshare groups. Finally, the group.coordinator.rebalance.protocols  configuration is changed to add "share"  to the list of enabled rebalance protocols.

This KIP builds upon KIP-848 which introduced the new group coordinator and the new records for the __consumer_offsets  topic. The pre-KIP-848 group coordinator will not recognize the new records, so this downgrade is not supported.

Downgrading to a software version that supports the new group coordinator but does not support share groups is supported. This KIP-932 adds new records to the __consumer_offsets  topic which will not be understood by the group coordinator. The group coordinator will ignore these records. The __share_group_state  topic will be unused because there will be no share coordinator and can be manually deleted.

...