Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: ShareGroupHeartbeat confirms assignment, admin and metrics tweaks

...

  • Sends the InitializeShareGroupState  RPC to the share coordinators for all of the new partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata record with the number array of partitions for the topic increased.

In order to remove a topic, the group coordinator:

  • If the topic still exists, it writes a ShareGroupPartitionMetadata record with the topic added to the DeletingTopics  array. This enables group coordinator failover to continue the deletion. If the topic does not exist, the requirement to continue deletion can be inferred by the non-existence of the topic.
  • Sends the DeleteShareGroupState  RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata record with the topic removed.

...

Method signatureDescription
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets)Alter offset information for a share group.
AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options) Alter offset information for a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitionsSet<String> topics)Delete offset information for a set of partitions topics in a share group.
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitionsSet<String> topics, DeleteShareGroupOffsetsOptions options) Delete offset information for a set of partitions topics in a share group.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds)Delete share groups from the cluster.
DeleteShareGroupResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupOptions options) Delete share groups from the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds)Describe some share groups in the cluster.
DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds, DescribeShareGroupsOptions options) Describe some share groups in the cluster.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs)List the share group offsets available in the cluster for the specified share groups.
ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options) List the share group offsets available in the cluster for the specified share groups.
ListShareGroupsResult listShareGroups()List the share groups available in the cluster.
ListShareGroupsResult listShareGroups(ListShareGroupsOptions options) List the share groups available in the cluster.
ListGroupsResult listGroups() List the groups available in the cluster.
ListGroupsResult listGroups(ListGroupsOptions options) List the groups available in the cluster.

...

Code Block
languagejava
    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This is a convenience method for {@link #alterShareGroupOffsets(String, Map, AlterShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition.
     * @return The AlterShareGroupOffsetsResult.
     */
    default AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
        return alterShareGroupOffsets(groupId, offsets, new AlterShareGroupOffsetsOptions());
    }

    /**
     * Alters offsets for the specified group. In order to succeed, the group must be empty.
     *
     * <p>This operation is not transactional so it may succeed for some partitions while fail for others.
     *
     * @param groupId The group for which to alter offsets.
     * @param offsets A map of offsets by partition. Partitions not specified in the map are ignored.
     * @param options The options to use when altering the offsets.
     * @return The AlterShareGroupOffsetsResult.
     */
    AlterShareGroupOffsetsResult alterShareGroupOffsets(String groupId, Map<TopicPartition, Long> offsets, AlterShareGroupOffsetsOptions options);

    /**
     * Delete offsets for a set of partitionstopics in a share group with the default
     * options. This will succeed at the partition level only if the group is not actively
     * subscribed to the corresponding topic.
     *
     * <p>This is a convenience method for {@link #deleteShareGroupOffsets(String, MapSet, DeleteShareGroupOffsetsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupId The group for which to delete offsets.
     * @param topics The topics.
     * @return The DeleteShareGroupOffsetsResult.
     */
    default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition>Set<String> partitionstopics) {
        return deleteShareGroupOffsets(groupId, partitionstopics, new DeleteShareGroupOffsetsOptions());
    }

    /**
     * Delete offsets for a set of partitionstopics in a share group. This will
     *
 succeed at the partition level* only@param ifgroupId theThe group isfor which notto activelydelete subscribedoffsets.
     * to@param thetopics correspondingThe topictopics.
     *
        * @param options The options to use when deleting offsets in a share group.
     * @return The DeleteShareGroupOffsetsResult.
     */
    DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId,
        Set<TopicPartition>Set<String> partitionstopics,
        DeleteShareGroupOffsetsOptions options);

    /**
     * Delete share groups from the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #deleteShareGroups(Collection<String>, DeleteShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @param groupIds The IDs of the groups to delete.
     * @return The DeleteShareGroupsResult.
     */
    default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
        return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
    }

    /**
     * Delete share groups from the cluster.
     *
     * @param groupIds The IDs of the groups to delete.
     * @param options The options to use when deleting a share group.
     * @return The DeleteShareGroupsResult.
     */
    DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options);

    /**
     * Describe some share groups in the cluster, with the default options.
     *
     * <p>This is a convenience method for {@link #describeShareGroups(Collection, DescribeShareGroupsOptions)}
     * with default options. See the overload for more details.
     *
     * @param groupIds The IDs of the groups to describe.
     * @return The DescribeShareGroupsResult.
     */
    default DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds) {
        return describeShareGroups(groupIds, new DescribeShareGroupsOptions());
    }

    /**
     * Describe some share groups in the cluster.
     *
     * @param groupIds The IDs of the groups to describe.
     * @param options  The options to use when describing the groups.
     * @return The DescribeShareGroupsResult.
     */
    DescribeShareGroupsResult describeShareGroups(Collection<String> groupIds,
                                                 DescribeShareGroupsOptions options);

    /**
     * List the share group offsets available in the cluster for the specified share groups with the default options.
     *
     * <p>This is a convenience method for {@link #listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}
     * to list offsets of all partitions for the specified share groups with default options.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @return The ListShareGroupOffsetsResult
     */
    default ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs) {
        return listShareGroupOffsets(groupSpecs, new ListShareGroupOffsetsOptions());
    }

    /**
     * List the share group offsets available in the cluster for the specified share groups.
     *
     * @param groupSpecs Map of share group ids to a spec that specifies the topic partitions of the group to list offsets for.
     * @param options The options to use when listing the share group offsets.
     * @return The ListShareGroupOffsetsResult
     */
    ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGroupOffsetsSpec> groupSpecs, ListShareGroupOffsetsOptions options);

    /**
     * List the share groups available in the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #listShareGroups(ListShareGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The ListShareGroupsResult.
     */
    default ListShareGroupsResult listShareGroups() {
        return listShareGroups(new ListShareGroupsOptions());
    }

    /**
     * List the share groups available in the cluster.
     *
     * @param options The options to use when listing the share groups.
     * @return The ListShareGroupsResult.
     */
    ListShareGroupsResult listShareGroups(ListShareGroupsOptions options);
 
    /**
     * List the groups available in the cluster with the default options.
     *
     * <p>This is a convenience method for {@link #listGroups(ListGroupsOptions)} with default options.
     * See the overload for more details.
     *
     * @return The ListGroupsResult.
     */
    default ListGroupsResult listGroups() {
        return listGroups(new ListGroupsOptions());
    }

    /**
     * List the groups available in the cluster.
     *
     * @param options The options to use when listing the groups.
     * @return The ListGroupsResult.
     */
    ListGroupsResult listGroups(ListGroupsOptions);

...

Code Block
languagejava
package org.apache.kafka.clients.admin;
 
/**
 * The result of the {@link Admin#deleteShareGroupOffsets(String, Set<TopicPartition>Set<String>, DeleteShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsResult {
    /**
     * Return a future which succeeds only if all the deletions succeed.
     */
    public KafkaFuture<Void> all() {
    }

    /**
     * Return a future which can be used to check the result for a given partitiontopic.
     */
    public KafkaFuture<Void> partitionResult(final TopicPartitionString partitiontopic) {
    }
}

DeleteShareGroupOffsetsOptions

Code Block
languagejava
package org.apache.kafka.client.admin;
 
/**
 * Options for the {@link Admin#deleteShareGroupOffsets(String, Set<TopicPartition>Set<String>, DeleteShareGroupOffsetsOptions)} call.
 * <p>
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class DeleteShareGroupOffsetsOptions extends AbstractOptions<DeleteShareGroupOffsetsOptions> {
}

...

Option

Description

--all-topics

Consider all topics assigned to a group in the `reset-offsets` process.

--bootstrap-server <String: server to connect to>

REQUIRED: The server(s) to connect to.

--command-config <String: command config property file>

Property file containing configs to be passed to Admin Client.

--delete

Pass in groups to delete topic partition offsets over the entire share group. For instance --group g1 --group g2

--delete-offsets

Delete offsets of share group. Supports one share group at the time, and multiple topics.

--describe

Describe share group and list offset lag (number of records not yet processed) related to given group.

--dry-run

Only show results without executing changes on share groups. Supported operations: reset-offsets.

--execute

Execute operation. Supported operations: reset-offsets.

--group <String: share group>

The share group we wish to act on.

--help

Print usage information.

--list

List all share groups.

--members

Describe members of the group. This option may be used with the '--describe' option only.

--offsets

Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only.

--reset-offsets

Reset offsets of share group. Supports one share group at a time, and instances must be inactive. If neither '--dry-run' nor '–execute' is specified, 

--state [String]

When specified with '--describe', includes the state of the group. When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. The valid values are 'Empty', 'Stable' and 'Dead'.

--timeout <Long: timeout (ms)>

The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000)   

--to-datetime <String: datetime>

Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'.

--to-earliest

Reset offsets to earliest offset.

--to-latest

Reset offsets to latest offset.

--topic <String: topic>

The topic whose share group information should be deleted or topic which should be included in the reset offset process. In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partitions to be included.

--version

Display Kafka version.

...

Code Block
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
        "about": "The group identifier." },
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID generated by the coordinator. The member ID must be kept during the entire lifetime of the member." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
      { "name": "RackId", "type": "string", "versions": "0+",  "nullableVersions": "0+", "default": "null",
        "about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." },
    { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." },
    { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the partitions owned by the member.", "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "SubscribedTopicNamesPartitions", "type": "[]stringint32", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
        "about": "nullThe if it didn't change since the last heartbeat; the subscribed topic names otherwise." partitions." }
    ]}
  ]
}

Response schema

The group coordinator will only send the Assignment field when it changes.

...

The AlterShareGroupOffsets API is used to alter the share-partition start offsets for the share-partitions in a share group. The share-partition leader handles group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "AlterShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]AlterShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to alter offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]AlterShareGroupOffsetsRequestPartition", "versions": "0+",
        "about": "Each partition to alter offsets for.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset." }
      ]}
    ]}
  ]
}

...

The DeleteShareGroupOffsets API is used to delete the offsets for the share-partitions in a share group. The share-partition leader handles group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DeleteShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]DeleteShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to delete offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupOffsetsResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED (version 0+)
  // - NOT_COORDINATOR (version 0+)
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - GROUP_NOT_EMPTY (version 0+)
  // - KAFKA_STORAGE_ERROR (version 0+)
  // - INVALID_REQUEST (version 0+)
  // - UNKNOWN_SERVER_ERROR (version 0+)
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]DeleteShareGroupOffsetsResponseTopic", "versions": "0+",
      "about": "The results for each topic.", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true,
        "about": "The unique topic ID." },
      { "name": "Partitions", "type": "[]DeleteShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
    "about": "The topic name." },
      { "name": "PartitionIndexTopicId", "type": "int32uuid", "versions": "0+",
 "ignorable": true,
        "about": "The unique partitiontopic indexID." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "ignorable": true, "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

DescribeShareGroupOffsets API

The DescribeShareGroupOffsets API is used to describe the offsets for the share-partitions in a share group. The share-partition leader handles group coordinator serves this API.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "DescribeShareGroupOffsetsRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic", "versions": "0+",
      "about": "The topics to describe offsets for.",  "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]int32", "versions": "0+",
        "about": "The partitions." }
      ]}
    ]}
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareGroupPartitionMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "InitializedTopics", "versions": "0+", "type": "[]TopicPartitionsMetadata",
      "about": "The topics with initialized share-group state." },
    { "name": "DeletingTopics", "versions": "0+",
 ", "type": "[]TopicMetadata",
      "about": "The topics whose share-group state is being deleted." }
  ],
  "commonStructs": [
    { "name": "TopicPartitionsMetadata", "versions": "0+", "fields": [
      { "name": "InitializedTopicsTopicId", "versionstype": "0+uuid", "typeversions": "[]TopicMetadata0+",
        "about": "The topics with initialized share-group statetopic identifier." }
  ],
  "commonStructs": [
    { "name": "TopicName", "type": "TopicMetadatastring", "versions": "0+",
        "fieldsabout": ["The topic name." },
      { "name": "TopicIdPartitions", "type": "uuid[]int32", "versions": "0+",
        "about": "The topic identifierpartitions." }
    ]},
    { "name": "TopicMetadata", "versions": "0+", "fields": [
      { "name": "TopicNameTopicId", "type": "stringuuid", "versions": "0+",
        "about": "The topic nameidentifier." },
      { "name": "NumPartitionsTopicName", "type": "int32string", "versions": "0+",
        "about": "The numbertopic of partitionsname." }
      ]}
  ]
}

Share-group state

These records are written by the share coordinator on the __share_group_state  topic.

...

The following new client metrics should be added:

Metric Name

Type

Group

Tags

Description

JMX Bean

last-poll-seconds-ago

Gauge

share-consumer-metrics

client-id 

The number of seconds since the last poll() invocation.

kafka.consumer:type=share-consumer-metrics,name=last-poll-seconds-ago,client-id=([-.\w]+) 

time-between-poll-avg

Meter

share-consumer-metrics

client-id 

The average delay between invocations of poll() in milliseconds.

kafka.consumer:type=share-consumer-metrics,name=time-between-poll-avg,client-id=([-.\w]+) 

time-between-poll-max

Meter

share-consumer-metrics

client-id 

The

max

maximum delay between invocations of poll() in milliseconds.

kafka.consumer:type=share-consumer-metrics,name=last-poll-seconds-ago,client-id=([-.\w]+) 

poll-idle-ratio-avg

Meter

share-consumer-metrics

client-id 

The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records.

kafka.consumer:type=share-consumer-metrics,name=poll-idle-ratio-avg,client-id=([-.\w]+) 

heartbeat-response-time-max

Meter

share-consumer-coordinator-metrics

client-id 

The

max

maximum time taken to receive a response to a heartbeat request in milliseconds.

kafka.consumer:type=share-consumer-coordinator-metrics,name=heartbeat-response-time-max,client-id=([-.\w]+) 

heartbeat-rate

Meter

share-consumer-coordinator-metrics

client-id 

The number of heartbeats per second.

kafka.consumer:type=share-consumer-coordinator-metrics,name=heartbeat-rate,client-id=([-.\w]+) 

heartbeat-total

Meter

share-consumer-coordinator-metrics

client-id 

The total number of heartbeats.

kafka.consumer:type=share-consumer-coordinator-metrics,name=heartbeat-total,client-id=([-.\w]+) 

last-heartbeat-seconds-ago

Gauge

share-consumer-coordinator-metrics

client-id 

The number of seconds since the last coordinator heartbeat was sent.

kafka.consumer:type=share-consumer-coordinator-metrics,name=last-heartbeat-seconds-ago,client-id=([-.\w]+) 

rebalance-latency-avg

Meter

share-consumer-coordinator-metrics

client-id 

The average time taken for a group to complete a rebalance in milliseconds.

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-latency-avg,client-id=([-.\w]+) 

rebalance-latency-max

Meter

share-consumer-coordinator-metrics

client-id 

The

max

maximum time taken for a group to complete a rebalance in milliseconds.

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-latency-max,client-id=([-.\w]+) 

rebalance-latency-total

Meter

share-consumer-coordinator-metrics

client-id 

The total number of milliseconds spent in rebalances.

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-latency-total,client-id=([-.\w]+) 

rebalance-total

Meter

share-consumer-coordinator-metrics

client-id 

The total number of rebalance events.

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-total,client-id=([-.\w]+) 

rebalance-rate-per-hour

Meter

share-consumer-coordinator-metrics

client-id 

The number of rebalance events per hour.

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-rate-per-hour,client-id=([-.\w]+) 

fetch-size-avg

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of bytes fetched per request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-size-avg,client-id=([-.\w]+) 

fetch-size-max

Meter

share-consumer-fetch-manager-metrics

client-id 

The maximum number of bytes fetched per request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-size-max,client-id=([-.\w]+) 

bytes-fetched-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of bytes fetched per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=bytes-fetched-rate,client-id=([-.\w]+) 

bytes-fetched-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of bytes fetched.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=bytes-fetched-total,client-id=([-.\w]+) 

records-per-request-avg

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of records in each request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=records-per-request-avg,client-id=([-.\w]+) 

records-per-request-max

Meter

share-consumer-fetch-manager-metrics

client-id 

The maximum number of records in a request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=records-per-request-max,client-id=([-.\w]+) 

records-fetched-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of records fetched per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=records-fetched-rate,client-id=([-.\w]+) 

records-fetched-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of records fetched.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=records-fetched-total,client-id=([-.\w]+) 

acknowledgements-send-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of record acknowledgements sent per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=acknowledgements-send-rate,client-id=([-.\w]+) 

acknowledgements-send-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of record acknowledgements sent.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=acknowledgements-send-total,client-id=([-.\w]+) 

acknowledgements-error-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of record acknowledgements that resulted in errors per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=acknowledgements-error-rate,client-id=([-.\w]+) 

acknowledgements-error-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of record acknowledgements that resulted in errors.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=acknowledgements-error-total,client-id=([-.\w]+) 

fetch-latency-avg

Meter

share-consumer-fetch-manager-metrics

client-id 

The average time taken for a fetch request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-latency-avg,client-id=([-.\w]+) 

fetch-latency-max

Meter

share-consumer-fetch-manager-metrics

client-id 

The

max

maximum time taken for any fetch request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-latency-max,client-id=([-.\w]+) 

fetch-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The number of fetch requests per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-rate,client-id=([-.\w]+) 

fetch-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of fetch requests.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-total,client-id=([-.\w]+) 

fetch-throttle-time-avg

Meter

share-consumer-fetch-manager-metrics

client-id 

The average throttle time in

ms

milliseconds.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-throttle-time-avg,client-id=([-.\w]+) 

fetch-throttle-time-max

Meter

share-consumer-fetch-manager-metrics

client-id 

The maximum throttle time in

ms

milliseconds.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-throttle-time-max,client-id=([-.\w]+) 

Future Work

There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.

...