Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Leader epoch, config and metrics clean up

...

Share groups do not have the ASSIGNING state because only server-side assignors are supported, and do not need the RECONCILING state because there’s no need for all members to converge before the group enters the STABLE state. There is no automatic expiration of share groups.

  • EMPTY - When a share group is created or the last member leaves the group, the share group is EMPTY.
  • STABLE - When a share group has active members, the share group is STABLE.
  • DEAD - When the share group remains EMPTY for a configured period, the group coordinator transitions it to DEAD to delete it. This only happens if the group does not have any persistent share-group state. Share groups are intentionally more durable than consumer groups.

...

The share coordinator will prefer to write a snapshot over an update (for example, when the SPSO moves and there are no in-flight records, the snapshot will be small and there’s no need to write an update instead). The share coordinator will take a snapshot periodically, frequently enough to minimise the number of ShareUpdate records to replay but rarely enough to minimise the performance cost of taking snapshots.

There are two kinds of fencing for share-group state.

The records also include a state epoch. This is used to ensure that all of the components involved are aligned on the current state, and to fence any calls to write to an old version of the state. Whenever the share-group state is initialized, the state epoch is set to the share group's current group epoch. This gives a very simple way to make sure that reads and writes refer to the current version of the state.

The records also include a leader epoch. Whenever the share-partition leader calls the share coordinator, it provides the leader epoch of the partition in the request. The share coordinator uses this to fence zombie share-partition leaders. When a new leader is elected for a share-partition, the leader epoch of the partition is incremented. This means that the new leader will use a higher leader epoch in its requests to the share coordinator, and any trailing requests from earlier share-partition leaders can be rejected with FENCED_LEADER_EPOCH . The leader epoch is persisted by the share coordinator in its ShareSnapshot record. When a new leader epoch is received, it causes the share coordinator to write a new ShareSnapshot record. When a new state epoch is used, the leader epoch is initialized to -1 , and it is properly initialized when a share-partition leader makes a request.

The records have the following content (note that the version number is used to differentiate between the record types, just as for the consumer-offsets topic):

...

They are all subclasses of RetriableException .

Broker API

...

ConsumerGroupPartitionAssignor

The new org.apache.kafka.coordinator.group.assignor.ShareGroupPartitionAssignorConsumerGroupPartitionAssignor  interface is an interface implemented by server-side assignors for share consumer groups. It signifies that the partition assignor is suitable for use with share consumer groups.

Code Block
languagejava
package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
 * Server -side partition assignor for consumer groups used by the GroupCoordinator.
 *
 * The interface is kept in an internal module until KIP-932 is fully
 * implemented and ready to be released.
 */
@InterfaceStability.Unstable
public interface ShareGroupPartitionAssignorConsumerGroupPartitionAssignor extends PartitionAssignor {
}

Command-line tools

kafka-share-groups.sh

The two built-in partition assignors for consumer groups, org.apache.kafka.coordinator.group.assignor.RangeAssignor  and org.apache.kafka.coordinator.group.assignor.UniformAssignor , are both changed to implement this interface instead of org.apache.kafka.coordinator.group.assignor.PartitionAssignor  because they are intended only for use with consumer groups.

ShareGroupPartitionAssignor

The new org.apache.kafka.coordinator.group.assignor.ShareGroupPartitionAssignor  interface is an interface implemented by server-side assignors for share groups. It signifies that the partition assignor is suitable for use with share groups.

Code Block
languagejava
package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
 * Server-side partition assignor for share groups used by the GroupCoordinator.
 *
 * The interface is kept in an internal module until KIP-932 is fully
 * implemented and ready to be released.
 */
@InterfaceStability.Unstable
public interface ShareGroupPartitionAssignor extends PartitionAssignor {
}

One implementation of this interface, org.apache.kafka.coordinator.group.share.SimpleAssignor, is provided.

Command-line tools

kafka-share-groups.sh

A new tool called kafka-share-groups.sh is added for working with share groups. It has the A new tool called kafka-share-groups.sh is added for working with share groups. It has the following options:

Option

Description

--all-topics

Consider all topics assigned to a group in the `reset-offsets` process.

--bootstrap-server <String: server to connect to>

REQUIRED: The server(s) to connect to.

--command-config <String: command config property file>

Property file containing configs to be passed to Admin Client.

--delete

Pass in groups to delete topic partition offsets over the entire share group. For instance --group g1 --group g2

--delete-offsets

Delete offsets of share group. Supports one share group at the time, and multiple topics.

--describe

Describe share group and list offset lag (number of records not yet processed) related to given group.

--dry-run

Only show results without executing changes on share groups. Supported operations: reset-offsets.

--execute

Execute operation. Supported operations: reset-offsets.

--group <String: share group>

The share group we wish to act on.

--help

Print usage information.

--list

List all share groups.

--members

Describe members of the group. This option may be used with the '--describe' option only.

--offsets

Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only.

--reset-offsets

Reset offsets of share group. Supports one share group at a time, and instances must be inactive. If neither '--dry-run' nor '–execute' is specified, 

--state [String]

When specified with '--describe', includes the state of the group. When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. The valid values are 'Empty', 'Stable' and 'Dead'.

--timeout <Long: timeout (ms)>

The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000)   

--to-datetime <String: datetime>

Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'.

--to-earliest

Reset offsets to earliest offset.

--to-latest

Reset offsets to latest offset.

--topic <String: topic>

The topic whose share group information should be deleted or included in the reset offset process. In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partitions to be included.

--version

Display Kafka version.

...

ConfigurationDescriptionValues
group.share.enableWhether to enable share groups on the broker.Default false  while the feature is being developed. This is an internal configuration.group.coordinator.rebalance.protocols The list of enabled rebalance protocols. (Existing configuration)

"share"  is included in the list of protocols to enable share groups.

This will be added to the default value of this configuration property once this feature is complete.

group.share.delivery.count.limitThe maximum number of delivery attempts for a record delivered to a share group.Default 5, minimum 2, maximum 10
group.share.record.lock.duration.msShare-group record acquisition lock duration in milliseconds.Default 30000 (30 seconds), minimum 1000 (1 second), maximum 60000 (60 seconds)
group.share.min.record.lock.duration.ms Share-group record acquisition lock minimum duration in milliseconds.Default 15000 (15 seconds), minimum 1000 (1 second), maximum 30000 (30 seconds)
group.share.max.record.lock.duration.msShare-group record acquisition lock maximum duration in milliseconds.Default 60000 (60 seconds), minimum 30000 (30 seconds), maximum 3600000 (1 hour)
group.share.partition.max.record.locksShare-group record lock limit per share-partition.Default 200, minimum 100, maximum 10000
group.share.session.timeout.ms 

The timeout to detect client failures when using the group protocol.

Default 45000 (45 seconds)
group.share.min.session.timeout.ms 

The minimum session timeout.

Default 45000 (45 seconds)
group.share.max.session.timeout.ms 

The maximum session timeout.

Default 60000 (60 seconds)
group.share.heartbeat.interval.ms 

The heartbeat interval given to the members.

Default 5000 (5 seconds)
group.share.min.heartbeat.interval.ms 

The minimum heartbeat interval.

Default 5000 (5 seconds)
group.share.max.heartbeat.interval.ms 

The maximum heartbeat interval.

Default 15000 (15 seconds)
group.share.max.groups 

The maximum number of share groups.

Default 10, minimum 1, maximum 100
group.share.max.size 

The maximum number of consumers that a single share group can accommodate.

Default 200, minimum 10, maximum 1000
group.share.assignors 

The server-side assignors as a list of full class names. The list must contain only a single entry which is used by all groups. In the future, it is envisaged that a group configuration will be provided to allow each group to choose one of the list of assignors.

A list of class names, currently limited to a single entry. Default "org.apache.kafka.coordinator.group.share.SimpleAssignor"
groupshare.sharecoordinator.state.topic.num.partitions 

The number of partitions for the share-group state topic (should not change after deployment).

Default 50
groupshare.sharecoordinator.state.topic.replication.factor 

The replication factor for the share-group state topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.

Default 3 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
groupshare.sharecoordinator.state.topic.segment.bytes 

The log segment size for the share-group state topic.

Default 104857600
groupshare.sharecoordinator.state.topic.min.isr 

Overridden min.insync.replicas for the share-group state topic.

Default 2 (specified as 1 in the example configuration files delivered in the AK distribution for single-broker use)
share.coordinator.threads 

The number of threads used by the share coordinator.

Default 1, minimum 1

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "InitializeShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about": "The group identifier." },
    { "name": "Topics", "type": "[]InitializeStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch forof thisthe share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, or -1 if the start offset is not being initialized." }
      ]}
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "Topics", "type": "[]ReadStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about", "The leader epoch of the share-partition." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadStateResult", "versions": "0+",
      "about": "The read results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch forof thisthe share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, which can be -1 if it is not yet initialized." },
        { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields":[
          { "name": "FirstOffset", "type": "int64", "versions": "0+",
            "about": "The first offset of this state batch." },
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of this state batch." },
          { "name": "DeliveryState", "type": "int8", "versions": "0+",
            "about": "The delivery state - 0:Available,2:Acked,4:Archived." },
          { "name": "DeliveryCount", "type": "int16", "versions": "0+",
            "about": "The delivery count." }
        ]}
      ]}
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "WriteShareGroupStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "Topics", "type": "[]WriteStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [[
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "TopicIdPartitions", "type": "uuid[]PartitionData", "versions": "0+",
        "about":  "The topic identifier." },
data for the partitions.", "fields": [
        { "name": "PartitionsPartition", "type": "[]PartitionDataint32", "versions": "0+",
          "about":  "The data for the partitionspartition index.", "fields": [ },
        { "name": "PartitionStateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch of the share-partition index." },
        { "name": "StateEpochLeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The stateleader epoch forof thisthe share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset, or -1 if the start offset is not being written." },
        { "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields": [
          { "name": "FirstOffset", "type": "int64", "versions": "0+",
            "about": "The first offset of this state batch." },
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "The last offset of this state batch." },
          { "name": "DeliveryState", "type": "int8", "versions": "0+",
            "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
          { "name": "DeliveryCount", "type": "int16", "versions": "0+",
            "about": "The delivery count." }
        ]}
      ]}
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "WriteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]WriteStateResult", "versions": "0+",
      "about": "The write results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupOffsetsStateRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "Topics", "type": "[]ReadOffsetsStateData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The leader epoch of the share-partition." }
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupOffsetsStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadOffsetsStateResult", "versions": "0+",
      "about": "The read results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch forof thisthe share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset." }
      ]}
    ]}
  ]
}

...

ShareSnapshotValue

Code Block
{
  "type": "data",
  "name": "ShareSnapshotValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "typename": "dataSnapshotEpoch",
  "nametype": "ShareSnapshotValueuint16",
  "validVersionsversions": "0",
      "flexibleVersionsabout": "0+",
The snapshot "fieldsepoch.": [},
    { "name": "SnapshotEpochStateEpoch", "type": "uint16int32", "versions": "0+",
      "about": "The snapshotstate epoch of the share-partition." },
    { "name": "StateEpochLeaderEpoch", "type": "int32", "versions": "0+",
      "about": "The stateleader epoch forof thisthe share-partition." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [
      { "name": "FirstOffset", "type": "int64", "versions": "0",
        "about": "The first offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "DeliveryState", "type": "int8", "versions": "0",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]} 
  ]
}

...

Code Block
{
  "type": "data",
  "name": "ShareUpdateValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SnapshotEpoch", "type": "uint16", "versions": "0",
      "about": "The snapshot epoch." },
    { "name": "StartOffset", "type": "int64", "versions": "0",
      "about": "The share-partition start offset, or -1 if the start offset is not being updated." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [
      { "name": "FirstOffset", "type": "int64", "versions": "0",
        "about": "The first offset of this state batch." },
      { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "DeliveryState", "type": "int8", "versions": "0",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]} 
  ]
}

Metrics

Broker

...

metrics

The following new broker metrics should be added:

Metric Name

Type

Group

Tags

Description

JMX Bean

group-count

Gauge

group-coordinator-metrics

protocol: share

The total number of share groups managed by group coordinator.

kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share 

rebalance (rebalance-rate and rebalance-count)

Meter

group-coordinator-metrics

protocol: share

The total number of share group rebalances count and rate.

kafka.server:type=group-coordinator-metrics,name=rebalance-rate,protocol=share 

kafka.server:type=group-coordinator-metrics,name=rebalance-count,protocol=share 

num-partitions

Gauge

group-coordinator-metrics

protocol: share

The number of share partitions managed by group coordinator. 

kafka.server:type=group-coordinator-metrics,name=num-partitions,protocol=share 

group-countGaugegroup-coordinator-metrics

protocol: share

state: {empty|stable|dead} 

The number of share groups in respective state.kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={empty|stable|dead} 

share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count)

Meter

share-group
-coordinator
-metrics
protocol: share


The total number of offsets acknowledged for share groups.

kafka.server:type=share-group-coordinator-metrics,name=share-acknowledgement-rate,protocol=share

kafka.server:type=share-group-coordinator-metrics,name=share-acknowledgement-count,protocol=share 

record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count)

Meter

share-group-
coordinator-
metrics
protocol: share

ack-type:{accept,release,reject} 


The number of records acknowledged per acknowledgement type.

kafka.server:type=share-group-

coordinator-

metrics,name=record-acknowledgement-rate,protocol=share,ack-type={accept,release,reject} 

kafka.server:type=share-group

-coordinator

-metrics,name=record-acknowledgement-count,protocol=share,ack-type={accept,release,reject} 

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-group
-coordinator
-metrics
protocol: share 


The time taken to load the share partitions.

kafka.server:type=share-group-

coordinator-

metrics,name=partition-load-time-avg,protocol=share 

kafka.server:type=share-group

-coordinator

-metrics,name=partition-load-time-max,protocol=share  

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-coordinator-metrics


The time taken in milliseconds to load the share-group state from the share-group state partitions.

kafka.server:type=share-coordinator-metrics,name=partition-load-time-avg 

kafka.server:type=share-coordinator-metrcs,name=partition-load-time-max 

thread-idle-ratio (thread-idle-ratio-min and thread-idle-ratio-avg)

Meter

share-coordinator-metrics


The fraction of time the share coordinator thread is idle.

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-min 

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-avg 

write (write-rate and write-total)

Meter

share-coordinator-metrics


The number of share-group state write calls per second.

kafka.server:type=share-coordinator-metrics,name=write-rate 

kafka.server:type=share-coordinator-metrics,name=write-total 

write-latency (write-latency-avg and write-latency-total)

Meter

share-coordinator-metrics


The time taken for a share-group state write call, including the time to write to the share-group state topic.

kafka.server:type=share-coordinator-metrics,name=write-latency-avg 

kafka.server:type=share-coordinator-metrics,name=write-latency-max 

num-partitions

Gauge

share-coordinator-metrics


The number of partitions in the share-state topic.

kafka.server:type=share-coordinator-metrics,name=num-partitions 

The group coordinator uses metrics in the group group-coordinator-metrics . The share-partition leader uses metrics in the group share-group-metrics . The share coordinator uses metrics in the group share-coordinator-metrics .

Client metrics

The following new client metrics should be added:

Metric Name

Type

Group

Tags

Description

JMX Bean

last-poll-seconds-ago

Gauge

share-consumer-metrics

client-id 

The number of seconds since the last poll() invocation.

kafka.consumer:type=share-consumer-metrics,name=last-poll-seconds-ago,client-id=([-.\w]+) 

time-between-poll-avg

Meter

share-consumer-metrics

client-id 

The average delay between invocations of poll() in milliseconds.

kafka.consumer:type=share-consumer-metrics,name=time-between-poll-avg,client-id=([-.\w]+) 

time-between-poll-max

Meter

share-consumer-metrics

client-id 

The maximum delay between invocations of poll() in milliseconds.

kafka.consumer:type=share-consumer-metrics,name=last-poll-seconds-ago,client-id=([-.\w]+) 

poll-idle-ratio-avg

Meter

share-consumer-metrics

client-id 

The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records.

kafka.consumer:type=share-consumer-metrics,name=poll-idle-ratio-avg,client-id=([-.\w]+) 

heartbeat-response-time-max

Meter

share-consumer-coordinator-metrics

client-id 

The maximum time taken to receive a response to a heartbeat request in milliseconds.

kafka.consumer:type=share-consumer-coordinator-metrics,name=heartbeat-response-time-max,client-id=([-.\w]+) 

heartbeat-rate

Meter

share-consumer-coordinator-metrics

client-id 

The number of heartbeats per second.

kafka.consumer:type=share-consumer-coordinator-metrics,name=heartbeat-rate,client-id=([-.\w]+) 

heartbeat-total

Meter

share-consumer-coordinator-metrics

client-id 

The total number of heartbeats.

kafka.consumer:type=share-consumer-coordinator-metrics,name=heartbeat-total,client-id=([-.\w]+) 

last-heartbeat-seconds-ago

Gauge

share-consumer-coordinator-metrics

client-id 

The number of seconds since the last coordinator heartbeat was sent.

kafka.consumer:type=share-consumer-coordinator-metrics,name=last-heartbeat-seconds-ago,client-id=([-.\w]+) 

rebalance-latency-avg

Meter

share-consumer-coordinator-metrics

client-id 

The average time taken for a group to complete a rebalance in milliseconds.

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-latency-avg,client-id=([-.\w]+) 

rebalance-latency-max

Meter

share-consumer-coordinator-metrics

client-id 

The maximum time taken for a group to complete a rebalance in milliseconds.

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-latency-max,client-id=([-.\w]+) 

rebalance-latency-total

Meter

share-consumer-coordinator-metrics

client-id 

The total number of milliseconds spent in rebalances.

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-latency-total,client-id=([-.\w]+) 

rebalance-total

Meter

share-consumer-coordinator-metrics

client-id 

The total number of rebalance events.

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-total,client-id=([-.\w]+) 

rebalance-rate-per-hour

Meter

share-consumer-coordinator-metrics

client-id 

The number of rebalance events per hour.

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-rate-per-hour,client-id=([-.\w]+) 

fetch-size-avg

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of bytes fetched per request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-size-avg,client-id=([-.\w]+) 

fetch-size-max

Meter

share-consumer-fetch-manager-metrics

client-id 

The maximum number of bytes fetched per request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-size-max,client-id=([-.\w]+) 

bytes-fetched-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of bytes fetched per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=bytes-fetched-rate,client-id=([-.\w]+) 

bytes-fetched-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of bytes fetched.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=bytes-fetched-total,client-id=([-.\w]+) 

records-per-request-avg

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of records in each request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=records-per-request-avg,client-id=([-.\w]+) 

records-per-request-max

Meter

share-consumer-fetch-manager-metrics

client-id 

The maximum number of records in a request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=records-per-request-max,client-id=([-.\w]+) 

records-fetched-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of records fetched per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=records-fetched-rate,client-id=([-.\w]+) 

records-fetched-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of records fetched.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=records-fetched-total,client-id=([-.\w]+) 

acknowledgements-send-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of record acknowledgements sent per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=acknowledgements-send-rate,client-id=([-.\w]+) 

acknowledgements-send-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of record acknowledgements sent.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=acknowledgements-send-total,client-id=([-.\w]+) 

acknowledgements-error-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of record acknowledgements that resulted in errors per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=acknowledgements-error-rate,client-id=([-.\w]+) 

acknowledgements-error-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of record acknowledgements that resulted in errors.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=acknowledgements-error-total,client-id=([-.\w]+) 

fetch-latency-avg

Meter

share-consumer-fetch-manager-metrics

client-id 

The average time taken for a fetch request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-latency-avg,client-id=([-.\w]+) 

fetch-latency-max

Meter

share-consumer-fetch-manager-metrics

client-id 

The maximum time taken for any fetch request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-latency-max,client-id=([-.\w]+) 

fetch-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The number of fetch requests per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-rate,client-id=([-.\w]+) 

fetch-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of fetch requests.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-total,client-id=([-.\w]+) 

fetch-throttle-time-avg

Meter

share-consumer-fetch-manager-metrics

client-id 

The average throttle time in milliseconds.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-throttle-time-avg,client-id=([-.\w]+) 

fetch-throttle-time-max

Meter

share-consumer-fetch-manager-metrics

client-id 

The maximum throttle time in milliseconds.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-throttle-time-max,client-id=([-.\w]+) 

Future Work

There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.

...

At these stages, KIP-932 can be used for familiarization and experimentation, but not production use. It is disabled in the default configuration for the cluster, and must be explicitly enabled. Doing so is not appropriate in a production cluster.

A temporary configuration group.share.enable is used to To turn on the feature, add "share"  to the group.coordinator.rebalance.protocols  configuration. There is no support for upgrade or downgrade.

...

To upgrade a cluster, it is first necessary to perform a rolling upgrade of the cluster to a software version which supports share groups. Then, the new protocol is enabled using the kafka-feature.sh  tool by setting a group.version  which supports it using the kafka-feature.sh  toolshare groups. Finally, the group.coordinator.rebalance.protocols  configuration is changed to add "share"  to the list of enabled rebalance protocols.

This KIP builds upon KIP-848 which introduced the new group coordinator and the new records for the __consumer_offsets  topic. The pre-KIP-848 group coordinator will not recognize the new records, so this downgrade is not supported.

Downgrading to a software version that supports the new group coordinator but does not support share groups is supported. This KIP-932 adds new records to the __consumer_offsets  topic which will not be understood by the group coordinator. The group coordinator will ignore these records. The __share_group_state  topic will be unused because there will be no share coordinator and can be manually deleted.

...