Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Schema and API changes for review comments

...

When a share-partition leader receives its first ShareFetch request for a topic-partition, it needs to initialize its share-partition state. It finds the share coordinator using the FindCoordinator RPC using (key: "groupId:topicId:partition", key_type: SHARE ). Then, it sends the ReadShareGroupState RPC to the share coordinator. If the share coordinator has no share-partition state to return, it returns the UNKNOWN_TOPIC_OR_PARTITION error code indicating that this share-partition is not actually part of the share group. Otherwise, it returns the state to the share-partition leader which uses it to initialize and begin fetching records for the consumers. The SPSO returned might be -1 indicating that the initial SPSO needs to be set based on the group.share.auto.offset.reset  configuration.

The share-partition leader must be aware of when the group coordinator being used to alter the SPSO with an AlterShareGroupOffsets  request. This only occurs when the group is empty. As a result, when the set of share sessions transitions from 0 to 1, the share-partition leader uses the ReadShareGroupOffsetsState  RPC to validate its state epoch (this request is much cheaper for the share coordinator to handle than ReadShareGroupState ). We know that there are no acquired records, so re-initializing the share-partition leader is non-disruptive. If the state epoch has changed, the share-partition leader issues a ReadShareGroupState  RPC to the share coordinator and uses the response to re-initialize.  

When a share-partition leader needs to update the durable share-partition state because of an acknowledgement or other state changed (such as a lock timeout), it sends the WriteShareGroupState RPC to the share coordinator. The share coordinator keeps track of the accumulated state of the share-partition and chooses how to record it to the share-state topic. Once it has successfully written to the topic and replication has completed, the RPC response is sent.

...

Method signatureDescription
void onComplete(Map<TopicIdPartition, Set<OffsetAndMetadata>> Set<Long>> offsets, Exception exception) 

A callback method the user can implement to provide asynchronous handling of acknowledgement completion.

Parameters:

offsets - A map of the offsets that this callback applies to.

exception - The exception thrown during processing of the request, or null if the acknowledgement completed successfully.

Exceptions:

InvalidRecordStateException - if the record state prevented the acknowledgement from succeeding.

WakeupException - if KafkaShareConsumer.wakeup() is called.

InterruptException - if the calling thread is interrupted.

AuthorizationException - if not authorized to the topic or group.

KafkaException - for any other unrecoverable errors.

...

Method signatureDescription
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> Long> offsets)Alter offset information for a share group.
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> Long> offsets, AlterShareGroupOffsetsOptions options) Alter offset information for a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions)Delete offset information for a set of partitions in a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) Delete offset information for a set of partitions in a share group.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds)Delete share groups from the cluster.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupOptions options) Delete share groups from the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds)Describe some share groups in the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds, DescribeShareGroupsOptions options) Describe some share groups in the cluster.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs)List the share group offsets available in the cluster for the specified share groups.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options) List the share group offsets available in the cluster for the specified share groups.
ListShareGroupsResult listShareGroups()List the share groups available in the cluster.
ListShareGroupsResult listShareGroups(ListShareGroupsOptions options) List the share groups available in the cluster.
ListGroupsResult listGroups() List the groups available in the cluster.
ListGroupsResult listGroups(ListGroupsOptions options) List the groups available in the cluster.

...

Code Block
languagejava
    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This is a convenience method for {@link #alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition with associated metadata.
     * @return The AlterShareGroupOffsetsResult.
     */
    default AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata>Long> offsets) {
        return alterShareGroupOffsets(groupId, offsets, new AlterShareGroupOffsetsOptions());
    }

    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition with associated metadata. Partitions not specified in the map are ignored.
     * @param options The options to use when altering the offsets.
     * @return The AlterShareGroupOffsetsResult.
     */
    AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata>Long> offsets, AlterShareGroupOffsetsOptions options);

    /**
     * Delete offsets for a set of partitions in a share group with the default
     * options. This will succeed at the partition level only if the group is not actively
     * subscribed to the corresponding topic.
     *
     * <p>This is a convenience method for {@link #deleteShareGroupOffsets(String, Map, DeleteShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The DeleteShareGroupOffsetsResult.
     */
    default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions) {
        return deleteShareGroupOffsets(groupId, partitions, new DeleteShareGroupOffsetsOptions());
    }

    /**
     * Delete offsets for a set of partitions in a share group. This will
     * succeed at the partition level only if the group is not actively subscribed
     * to the corresponding topic.
     *
     * @param options The options to use when deleting offsets in a share group.
     * @return The DeleteShareGroupOffsetsResult.
     */
    DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId,
        Set<TopicPartition> partitions,
        DeleteShareGroupOffsetsOptions options);

    /**
     * Delete share groups from the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupIds The IDs of the groups to delete.
     * @return The DeleteShareGroupsResult.
     */
    default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
        return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
    }

    /**
     * Delete share groups from the cluster.
     *
     * @param groupIds The IDs of the groups to delete.
     * @param options The options to use when deleting a share group.
     * @return The DeleteShareGroupsResult.
     */
    DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options);

    /**
     * Describe some share groups in the cluster, with the default options.
     *
     * <p>This is a convenience method for {@link #describeShareGroups(Collection, DescribeShareGroupsOptions)}
     * with default options. See the overload for more details.
     *
     * @param groupIds The IDs of the groups to describe.
     * @return The DescribeShareGroupsResult.
     */
    default DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds) {
        return describeShareGroups(groupIds, new DescribeShareGroupsOptions());
    }

    /**
     * Describe some share groups in the cluster.
     *
     * @param groupIds The IDs of the groups to describe.
     * @param options  The options to use when describing the groups.
     * @return The DescribeShareGroupsResult.
     */
    DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds,
                                                 DescribeShareGroupsOptions options);

    /**
     * List the share group offsets available in the cluster for the specified share groups with the default options.
     *
     * <p>This is a convenience method for {@link #listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}
     * to list offsets of all partitions for the specified share groups with default options.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @return The ListShareGroupOffsetsResult
     */
    default ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs) {
        return listShareGroupOffsets(groupSpecs, new ListShareGroupOffsetsOptions());
    }

    /**
     * List the share group offsets available in the cluster for the specified share groups.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @param options The options to use when listing the share group offsets.
     * @return The ListShareGroupOffsetsResult
     */
    ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options);

    /**
     * List the share groups available in the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #listShareGroups(ListShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The ListShareGroupsResult.
     */
    default ListShareGroupsResult listShareGroups() {
        return listShareGroups(new ListShareGroupsOptions());
    }

    /**
     * List the share groups available in the cluster.
     *
     * @param options The options to use when listing the share groups.
     * @return The ListShareGroupsResult.
     */
    ListShareGroupsResult listShareGroups(ListShareGroupsOptions options);
 
    /**
     * List the groups available in the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #listGroups(ListGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The ListGroupsResult.
     */
    default ListGroupsResult listGroups() {
        return listGroups(new ListGroupsOptions());
    }

    /**
     * List the groups available in the cluster.
     *
     * @param options The options to use when listing the groups.
     * @return The ListGroupsResult.
     */
    ListGroupsResult listGroups(ListGroupsOptions);

...

Code Block
languagejava
package org.apache.kafka.clients.admin;

/**
 * The result of the {@link Admin#alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata>Long>), AlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsResult {
    /**
     * Return a future which succeeds if all the alter offsets succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a future which can be used to check the result for a given partition.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
    }
}

...

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#alterShareGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata>Long>), AlterShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterShareGroupOffsetsOptions extends AbstractOptions<AlterShareGroupOffsetsOptions> {
}

...

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec>, ListShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class ListShareGroupOffsetsResult {
    /**
     * Return a future which yields all Map<String, Map<TopicPartition, OffsetAndMetadata>Long> objects, if requests for all the groups succeed.
     */
    public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>>Long>>> all() {
    }

    /**
     * Return a future which yields a map of topic partitions to OffsetAndMetadata objectsoffsets for the specified group.
     */
    public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>Long>> partitionsToOffsetAndMetadatapartitionsToOffset(String groupId) {
    }
}

ListShareGroupOffsetsOptions

...

OptionDescription
--bootstrap-server <String: server to connect to>REQUIRED: The server(s) to connect to.
--consumer-config <String: config file>Consumer config properties file. Note that [consumer-property] takes precedence over this config.
--consumer-property <String: consumer_prop>

Consumer property in the form key=value.

--enable-systest-events

Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.)

--formatter <String: class>

The name of a class to use for formatting Kafka messages for display. (default: kafka.tools.DefaultMessageFormatter)

--formatter-config <String: config file>

Config properties file to initialize the message formatter. Note that [property] takes precedence of this config.

--group <String: share groud id>

The share group id of the consumer. (default: "console-share-consumer" )

--help

Print usage information.

--key-deserializer <String: deserializer for keys>

The name of the class to use for deserializing keys.

--max-messages <Integer: num_messages>

The maximum number of messages to consume before exiting. If not set, consumption is continual.

--property <String: prop>

The properties to initialize the message formatter. Default properties include:

 print.timestamp=true|false

 print.key=true|false

 print.offset=true|false

 print.delivery=true|false

 print.partition=true|false

 print.headers=true|false

 print.value=true|false

 key.separator=<key.separator>

 line.separator=<line.separator>

 headers.separator=<line.separator>

 null.literal=<null.literal>

 key.deserializer=<key.deserializer>

 value.deserializer=<value.deserializer>

 header.deserializer=<header.deserializer>

Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key.deserializer.', 'value.deserializer.' and 'headers.deserializer.' prefixes to configure their deserializers.

--reject

If specified, messages are rejected as they are consumed.

--reject-message-on-error

If there is an error when processing a message, reject it instead of halting.

--release

If specified, messages are released as they are consumed.

--timeout-ms <Integer: timeout_ms>

If specified, exit if no message is available for consumption for the specific interval.

--topic <String: topic>

REQUIRED: The topic to consume from.

--value-deserializer <String: deserializer for values>

The name of the class to use for deserializing values.

--version

Display Kafka version.

...

  • ShareGroupHeartbeat - for consumers to form and maintain share groups
  • ShareGroupDescribe - for describing share groups
  • ShareFetch - for fetching records from share-partition leaders
  • ShareAcknowledge - for acknowledging delivery of records with share-partition leaders
  • AlterShareGroupOffsets - for altering the share-partition start offsets for the share-partitions in a share group
  • DeleteShareGroupOffsets - for deleting the offsets for the share-partitions in a share group
  • DescribeShareGroupOffsets - for describing the offsets for the share-partitions in a share group
  • InitializeShareGroupState  - for initializing share-partition state on a share-coordinator
  • ReadShareGroupState - for reading share-partition state from a share coordinator

  • WriteShareGroupState - for writing share-partition state to a share coordinator

  • DeleteShareGroupState - for deleting share-partition state from a share coordinator

  • ReadShareGroupOffsetsState  - for reading the offsets from the share-partition state from a share coordinator

The KIP It also introduces version 5 of FindCoordinatorRequest to add the new key type of FindCoordinatorRequest.CoordinatorType.SHARE with value 2. There is no change to the schema.new versions of the following APIs:

  • FindCoordinator  - for finding coordinators, to support share coordinators
  • ListGroups  - for listing groups, to support listing share groups

Error codes

This KIP adds the following error codes the Kafka protocol.

  • INVALID_RECORD_STATE  - The record state is invalid. The acknowledgement of delivery could not be completed.
  • SHARE_SESSION_NOT_FOUND  - The share session is not found.
  • INVALID_SHARE_SESSION_EPOCH  - The share session epoch is invalid.
  • FENCED_STATE_EPOCH  - The share coordinator rejected the request because share-group state epoch did not match.

ShareGroupHeartbeat API

FindCoordinator API

The KIP introduces version 5.

Request schema

Version 5 adds the new key type of FindCoordinatorRequest.CoordinatorType.SHARE with value 2 with the key of "group:topicId:partition".

Code Block
{
  "apiKey": 10,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "FindCoordinatorRequest",
  // Version 1 adds KeyType.
  //
  // Version 2 is the same as version 1.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds support for batching via CoordinatorKeys (KIP-699)
  //
  // Version 5 adds support for share groups.
  "validVersions": "0-5",
  "deprecatedVersions": "0",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "Key", "type": "string", "versions": "0-3",
      "about": "The coordinator key." },
    { "name": "KeyType", "type": "int8", "versions": "1+", "default": "0", "ignorable": false,
      "about": "The coordinator key type. (Group, transaction, share, etc.)" },
    { "name": "CoordinatorKeys", "type": "[]string", "versions": "4+",
      "about": "The coordinator keys." }
  ]
}

Response schema

Version 5 is the same as version 4.

ListGroups API

This KIP introduces version 6 

Request schema

Version 6 adds support for share groups.

Code Block
{
  "apiKey": 16,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListGroupsRequest",
  // Version 1 and 2 are the same as version 0.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the StatesFilter field (KIP-518).
  //
  // Version 5 adds the TypesFilter field (KIP-848).
  //
  // Version 6 adds support for share groups.
  "validVersions": "0-6",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "StatesFilter", "type": "[]string", "versions": "4+",
      "about": "The states of the groups we want to list. If empty, all groups are returned with their state." },
    { "name": "TypesFilter", "type": "[]string", "versions": "5+",
      "about": "The types of the groups we want to list. If empty, all groups are returned with their type." }
  ]
}

Response schema

Version 6 is the same as version 5.

ShareGroupHeartbeat API

The ShareGroupHeartbeat API is used by The ShareGroupHeartbeat API is used by share group consumers to form a group. The API allows members to advertise their subscriptions and their state. The group coordinator uses it to assign partitions to and revoke partitions from members. This API is also used as a liveness check.

Request schema

The member must set all the top-level fields when it joins for the first time or when an error occurs. Otherwise, it is expected only to fill in the fields which have changed since the last heartbeat.

Code Block
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
        "about": "The group identifier." },
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID generated by the coordinator. The member ID must be kept during the entire lifetime of the member." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
      { "name": "RackId", "type": "string", "versions": "0+",  "nullableVersions": "0+", "default": "null",
        "about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." },
      { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
        "about": "-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
      { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }
  ]
}

Response schema

The group coordinator will only send the Assignment field when it changes.

Code Block
{
  "apiKey": TBD,
  "type": "response",
  "name": "ShareGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_MEMBER_ID (version 0+)
  // - GROUP_MAX_SIZE_REACHED (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The member ID generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
      "about": "The heartbeat interval in milliseconds." },
    { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not provided; the assignment otherwise.", "fields": [
        { "name": "Error", "type": "int8", "versions": "0+",
          "about": "The assigned error." },
        { "name": "AssignedTopicPartitions", "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
          "about": "The partitions assigned to the member." }
    ]}
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "Partitions", "type": "[]int32", "versions": "0+",
          "about": "The partitions." }
    ]}
  ]
}

...

The ShareGroupDescribe API is used to describe share groups.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareGroupDescribeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",
      "about": "The ids of the groups to describe" },
    { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+",
      "about": "Whether to include authorized operations." }
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareGroupDescribeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - INVALID_GROUP_ID (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
      "about": "Each described group.",
      "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The describe error, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The top-level error message, or null if there was no error." },
        { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
          "about": "The group ID string." },
        { "name": "GroupState", "type": "string", "versions": "0+",
          "about": "The group state string, or the empty string." },
        { "name": "GroupEpoch", "type": "int32", "versions": "0+",
          "about": "The group epoch." },
        { "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
          "about": "The assignment epoch." },
        { "name": "AssignorName", "type": "string", "versions": "0+",
          "about": "The selected assignor." },
        { "name": "Members", "type": "[]Member", "versions": "0+",
          "about": "The members.",
          "fields": [
            { "name": "MemberId", "type": "string", "versions": "0+",
              "about": "The member ID." },
            { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
              "about": "The member rack ID." },
            { "name": "MemberEpoch", "type": "int32", "versions": "0+",
              "about": "The current member epoch." },
            { "name": "ClientId", "type": "string", "versions": "0+",
              "about": "The client ID." },
            { "name": "ClientHost", "type": "string", "versions": "0+",
              "about": "The client host." },
            { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName",
              "about": "The subscribed topic names." },
            { "name": "Assignment", "type": "Assignment", "versions": "0+",
              "about": "The current assignment." }
          ]},
        { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", "default": "-2147483648",
          "about": "32-bit bitfield to represent authorized operations for this group." }
      ]
    }
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
    ]},
    { "name": "Assignment", "versions": "0+", "fields": [
      { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
        "about": "The assigned topic-partitions to the member." },
      { "name": "Error", "type": "int8", "versions": "0+",
        "about": "The assigned error." },
      { "name": "MetadataVersion", "type": "int32", "versions": "0+",
        "about": "The assignor metadata version." },
      { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
        "about": "The assignor metadata bytes." }
    ]}
  ]
}

ShareFetch API

...

  • If acknowledgements are being made for a partition and no records should be fetched, PartitionMaxBytes  should be set to zero.
  • If acknowledgements are being made for a partition which is being removed from the share session, the partition is included in the Topics array with PartitionMaxBytes  set to zero AND the partition is included in ForgottenTopicsData .
  • If acknowledgements are being made for a partition in the final request in a share session, the partition is included in the Topics  array and ShareSessionEpoch  is set to -1. No data will be fetched and it is not necessary to include the partition in ForgottenTopicsData .
  • If there's an error which affects all piggybacked acknowledgements but which does not prevent data from being fetched, the AcknowledgeErrorCode  in the response will be set to the same value for all partitions which had piggybacked acknowledgements.

Request schema

For the AcknowledgementBatches of each topic-partition, the BaseOffsets  must be ascending order and the ranges must be non-overlapping.

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareFetchRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The member ID." },
    { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
      "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
      "about": "The maximum time in milliseconds to wait for the response." },
    { "name": "MinBytes", "type": "int32", "versions": "0+",
      "about": "The minimum bytes to accumulate in the response." },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff", "ignorable": true,
      "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
    { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "BaseOffset", "type": "int64", "versions": "0+",
            "about": "Base offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
            "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}
    ]},
    { "name": "ForgottenTopicsData", "type": "[]ForgottenTopic", "versions": "0+", "ignorable": false,
      "about": "The partitions to remove from this share session.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions indexes to forget." }
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareFetchResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors for ErrorCode and AcknowledgeErrorCode:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+) - only for AcknowledgeErrorCode
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - CORRUPT_MESSAGE (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top-level response error code." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level response error code." },
 error message, or null if there was no error." },
     { "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "ResponsesPartitions", "type": "[]ShareFetchableTopicResponsePartitionData", "versions": "0+",
        "about": "The responsetopic topicspartitions.", "fields": [
        { "name": "TopicIdPartitionIndex", "type": "uuidint32", "versions": "0+", "ignorable": true,
          "about": "The uniquepartition topic IDindex." },
        { "name": "PartitionsErrorCode", "type": "[]PartitionDataint16", "versions": "0+",
          "about": "The topic partitions.", "fields": [fetch error code, or 0 if there was no fetch error." },
        { "name": "PartitionIndexErrorMessage", "type": "int32string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The partition indexfetch error message, or null if there was no fetch error." },
            { "name": "ErrorCodeAcknowledgeErrorCode", "type": "int16", "versions": "0+",
          "about": "The acknowledge error code, or 0 if there was no fetchacknowledge error." },
        { "name": "AcknowledgeErrorCodeAcknowledgeErrorMessage", "type": "int16string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The acknowledge error codemessage, or 0null if there was no acknowledge error." }, 
         { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch." }
        ]},
        { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."},
        { "name": "AcquiredRecords", "type": "[]AcquiredRecords", "versions": "0+", "about": "The acquired records.", "fields":  [
          {"name": "BaseOffset", "type":  "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."},
          {"name": "LastOffset", "type": "int64", "versions": "0+", "about": "The last offset of this batch of acquired records."},
          {"name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count of this batch of acquired records."}
        ]}
      ]}
    ]},
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+",
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

...

The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.

Request schema

For the AcknowledgementBatches of each topic-partition, the BaseOffsets  must be ascending order and the ranges must be non-overlapping.

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareAcknowledgeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The member ID." },
    { "name": "ShareSessionEpoch", "type": "int32", "versions": "0+",
      "about": "The current share session epoch: 0 to open a share session; -1 to close it; otherwise increments for consecutive requests." },
    { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
      "about": "The topics containing records to acknowledge.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
        "about": "The partitions containing records to acknowledge.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "BaseOffset", "type": "int64", "versions": "0+",
            "about": "Base offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
          { "name": "AcknowledgeTypes", "type": "[]int8", "versions": "0+",
            "about": "Array of acknowledge types - 0:Gap,1:Accept,2:Release,3:Reject."}
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareAcknowledgeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - TOPIC_AUTHORIZATION_FAILED (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - SHARE_SESSION_NOT_FOUND (version 0+)
  // - INVALID_SHARE_SESSION_EPOCH (version 0+) 
  // - NOT_LEADER_OR_FOLLOWER (version 0+)
  // - UNKNOWN_TOPIC_ID (version 0+)
  // - INVALID_RECORD_STATE (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top level response error code." },
    { "name": "ErrorCodeErrorMessage", "type": "int16string", "versions": "0+", "nullableVersions": "ignorable"0+", "default": true"null",
      "about": "The top level response error code-level error message, or null if there was no error." },
     { "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID."},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." },
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "0+", "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown." },
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch." }
        ]}
      ]}
    ]},
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "0+",
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

...

The AlterShareGroupOffsets API is used to alter the share-partition start offsets for the share-partitions in a share group. The share-partition leader handles this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "AlterShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]AlterShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to alter offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]AlterShareGroupOffsetsRequestPartition", "versions": "0+",
        "about": "Each partition to alter offsets for.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "AlterShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]AlterShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]AlterShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

The DeleteShareGroupOffsets API is used to delete the offsets for the share-partitions in a share group. The share-partition leader handles this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]DeleteShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to delete offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DeleteShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DeleteShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

The DescribeShareGroupOffsets API is used to describe the offsets for the share-partitions in a share group. The share-partition leader handles this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DescribeShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to describe offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DescribeShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DescribeShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DescribeShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset."},
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

The InitializeShareGroupState API is used by the group coordinator to initialize the share-partition state. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "InitializeShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]InitializeStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch for this share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, or -1 if the start offset is not being initialized." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "InitializeShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]InitializeStateResult", "versions": "0+",
      "about": "The initialization results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [ "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "PartitionErrorCode", "type": "int32int16", "versions": "0+",
          "about": "The partition index error code, or 0 if there was no error." },
        { "name": "ErrorCodeErrorMessage", "type": "int16string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error codemessage, or 0null if there was no error." }
        ]}
    ]}
  ]
}

ReadShareGroupState API

The ReadShareGroupState API is used by share-partition leaders to read share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "Topics", "type": "[]ReadStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadStateResult", "versions": "0+",
      "about": "The read results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch for this share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, which can be -1 if it is not yet initialized." },
        { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields":[
          { "name": "BaseOffset", "type": "int64", "versions": "0+",
            "about": "The base offset of this state batch." },
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of this state batch." },
          { "name": "State", "type": "int8", "versions": "0+",
            "about": "The state - 0:Available,2:Acked,4:Archived." },
          { "name": "DeliveryCount", "type": "int16", "versions": "0+",
            "about": "The delivery count." }
        ]}
      ]}
    ]}
  ]
}

...

The WriteShareGroupState API is used by share-partition leaders to write share-partition state to a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "WriteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "Topics", "type": "[]WriteStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch for this share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, or -1 if the start offset is not being written." },
        { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields": [
          { "name": "BaseOffset", "type": "int64", "versions": "0+",
            "about": "The base offset of this state batch." },
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of this state batch." },
          { "name": "State", "type": "int8", "versions": "0+",
            "about": "The state - 0:Available,2:Acked,4:Archived" },
          { "name": "DeliveryCount", "type": "int16", "versions": "0+",
            "about": "The delivery count." }
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "WriteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]WriteStateResult", "versions": "0+",
      "about": "The write results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorCodeErrorMessage", "type": "int16string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error codemessage, or 0null if there was no error." }
        ]}
    ]}
  ]
}

DeleteShareGroupState API

The DeleteShareGroupState API is used by the group coordinator to delete share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "Topics", "type": "[]DeleteStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]DeleteStateResult", "versions": "0+",
      "about": "The delete results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

ReadShareGroupOffsetsState API

The ReadShareGroupOffsetsState API is used by the group coordinator to read the offset information from share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupOffsetsStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "Topics", "type": "[]ReadOffsetsStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupOffsetsStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadOffsetsStateResult", "versions": "0+",
      "about": "The read results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch for this share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset." }
      ]}
    ]}
  ]
}

...