Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Client metrics, ShareGroupPartitionMetadata clean up

...

Share group membership is controlled by the group coordinator. Consumers in a share group use the heartbeat mechanism to join, leave and confirm continued membership of the share group, using the new ShareGroupHeartbeat RPC. Share-partition assignment is also piggybacked on the heartbeat mechanism. Share groups only support server-side assignors, which implement implements the org.apache.kafka.coordinator.group.shareassignor.SharePartitionAssignorShareGroupPartitionAssignor  interface. There is just one implementation of this interface so far, org.apache.kafka.coordinator.group.share.SimpleAssignor .

...

The share-group state data is not very amenable to log compaction. As a result, the share coordinator uses unlimited log retention and prunes the log records itself using ReplicaManager.deleteRecords. The share coordinator can delete all records before the latest ShareSnapshot for all active share-partitions. By taking periodic snapshots, the latest ShareSnapshot is replaced. For idle share-partitions, the share coordinator will periodically write a new ShareSnapshot so the older records can be pruned.

Using the share coordinator from the group coordinator

When a share-partition leader receives its first ShareFetch request for a topic-partition, it needs to initialize its The group coordinator is responsible for asking the share coordinator to initialize and delete the durable share-partition state. It finds the share coordinator using the FindCoordinator RPC using (key: "groupId:topicId:partition", key_type: SHARE ). Then, it sends the ReadShareGroupState RPC to the share coordinator. If the share coordinator has no share-partition state to return, it returns the UNKNOWN_TOPIC_OR_PARTITION error code indicating that this share-partition is not actually part of the share group. Otherwise, it returns the state to the share-partition leader which uses it to initialize and begin fetching records for the consumers. The SPSO returned might be -1 indicating that the initial SPSO needs to be set based on the group.share.auto.offset.reset configuration.

The share-partition leader must be aware of when the group coordinator is being used to alter the SPSO with a KafkaAdmin.alterShareGroupOffsets request. This only occurs when the group is empty. As a result, when the set of share sessions transitions from 0 to 1, the share-partition leader uses the ReadShareGroupOffsetsState RPC to validate its state epoch (this request is much cheaper for the share coordinator to handle than ReadShareGroupState ). We know that there are no acquired records, so re-initializing the share-partition leader is non-disruptive. If the state epoch has changed, the share-partition leader issues a ReadShareGroupState RPC to the share coordinator and uses the response to re-initialize.  

When a share-partition leader needs to update the durable share-partition state because of an acknowledgement or other state changed (such as a lock timeout), it sends the WriteShareGroupState RPC to the share coordinator. The share coordinator keeps track of the accumulated state of the share-partition and chooses how to record it to the share-state topic. Once it has successfully written to the topic and replication has completed, the RPC response is sent.

When a share partition is removed from a share group, perhaps because the topic is deleted or the administrator deletes the share-partition offsets (see KafkaAdmin.deleteShareGroupOffsets), the share-partition leader sends the DeleteShareGroupState RPC to the share coordinator. The share coordinator writes a final ShareSnapshot record with the special snapshot epoch value of -1. This acts as a deletion marker so the recovery processing of the share-group state topic sees the share-partition has been removed. There is no need to retain the deletion marker. Its only purpose is to make sure that a share coordinator reading records for a share-partition before it was removed, notices that those records apply to a defunct share-partition.

Examples with share-group state

Here are some examples showing the writing of share-group state.

The group coordinator uses the ShareGroupPartitionMetadata record to keep track of which share-partitions have been initialized.

The state is initialized in the following cases:

  • When a topic is added to the set of subscribed topics for a share group and is not yet in the ShareGroupPartitionMetadata record
  • When partitions are added to a topic which is already in the ShareGroupPartitionMetadata record
  • When KafkaAdmin.alterShareGroupOffsets  is used to reset the SPSO for a share-partition

The state is deleted in the following cases:

  • When a topic in the ShareGroupPartitionMetadata record is deleted
  • When KafkaAdmin.deleteShareGroupOffsets  is used to delete state for a share-partition, most likely as a result of an administrator cleaning up a topic which is no longer in use by this share group
  • When the share group is deleted

The group coordinator periodically reconciles its "hard" state in the ShareGroupPartitionMetadata with the "soft" state in the cluster metadata. This is how it observes relevant changes to topics, such as topic deletion. The ShareGroupPartitionMetadata contains the set of topic-partitions which are known to be initialized.

In order to add a topic, the group coordinator:

  • Sends the InitializeShareGroupState  RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata record with the topic added.

In order to add partitions to a known topic, the group coordinator:

  • Sends the InitializeShareGroupState  RPC to the share coordinators for all of the new partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata record with the number of partitions for the topic increased.

In order to remove a topic, the group coordinator:

  • Sends the DeleteShareGroupState  RPC to the share coordinators for all of the partitions of the topic. This step can be repeated to handle failures, such as group coordinator failover.
  • When it has successful responses for all partitions, it writes a ShareGroupPartitionMetadata record with the topic removed.

Using the share coordinator from the share-partition leader

When a share-partition leader receives its first ShareFetch request for a topic-partition, it needs to initialize its share-partition state. It finds the share coordinator using the FindCoordinator RPC using (key: "groupId:topicId:partition", key_type: SHARE ). Then, it sends the ReadShareGroupState RPC to the share coordinator. If the share coordinator has no share-partition state to return, it returns the UNKNOWN_TOPIC_OR_PARTITION error code indicating that this share-partition is not actually part of the share group. Otherwise, it returns the state to the share-partition leader which uses it to initialize and begin fetching records for the consumers. The SPSO returned might be -1 indicating that the initial SPSO needs to be set based on the group.share.auto.offset.reset configuration.

The share-partition leader must be aware of when the group coordinator is being used to alter the SPSO with a KafkaAdmin.alterShareGroupOffsets request. This only occurs when the group is empty. As a result, when the set of share sessions transitions from 0 to 1, the share-partition leader uses the ReadShareGroupOffsetsState RPC to validate its state epoch (this request is much cheaper for the share coordinator to handle than ReadShareGroupState ). We know that there are no acquired records, so re-initializing the share-partition leader is non-disruptive. If the state epoch has changed, the share-partition leader issues a ReadShareGroupState RPC to the share coordinator and uses the response to re-initialize.  

When a share-partition leader needs to update the durable share-partition state because of an acknowledgement or other state changed (such as a lock timeout), it sends the WriteShareGroupState RPC to the share coordinator. The share coordinator keeps track of the accumulated state of the share-partition and chooses how to record it to the share-state topic. Once it has successfully written to the topic and replication has completed, the RPC response is sent.

When a share partition is removed from a share group, perhaps because the topic is deleted or the administrator deletes the share-partition offsets (see KafkaAdmin.deleteShareGroupOffsets), the share-partition leader sends the DeleteShareGroupState RPC to the share coordinator. The share coordinator writes a final ShareSnapshot record with the special snapshot epoch value of -1. This acts as a deletion marker so the recovery processing of the share-group state topic sees the share-partition has been removed. There is no need to retain the deletion marker. Its only purpose is to make sure that a share coordinator reading records for a share-partition before it was removed, notices that those records apply to a defunct share-partition.

Examples with share-group state

Here are some examples showing the writing of share-group state.

Operation

State changes

Cumulative state

WriteShareGroupState request

Starting state of topic-partition with latest offset 100

SPSO=100, SPEO=100

SPSO=100, SPEO=100


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 100,
  "StateBatches": []
}


In the batched case with successful processing, there’s a state change per batch to move the SPSO forwards


Fetch records 100-109

SPEO=110, records 100-109 (acquired, delivery count 1)

SPSO=100, SPEO=110, records 100-109 (acquired, delivery count 1)


Acknowledge 100-109

SPSO=110

SPSO=110, SPEO=110


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 110,
  "StateBatches": []
}


With a messier sequence of release and acknowledge, there’s a state change for each operation which can act on multiple records


Fetch records 110-119

Consumer 1 get 110-112, consumer 2 gets 113-118, consumer 3 gets 119

SPEO=120, records 110-119 (acquired, delivery count 1)

SPSO=110, SPEO=120, records 110-119 (acquired, delivery count 1)


Release 110 (consumer 1)

record 110 (available, delivery count 1)

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-119 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "FirstOffset": 110,
      "LastOffset": 110,
      "DeliveryState": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}


Acknowledge 119 (consumer 3)

record 110 (available, delivery count 1), records 111-118 acquired, record 119 acknowledged

SPSO=110, SPEO=120, record 110 (available, delivery count 1), records 111-118 (acquired, delivery count 1), record 119 acknowledged


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "FirstOffset": 119,
      "LastOffset": 119,
      "DeliveryState": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 110, 120 (consumer 1)

SPEO=121, record 110 (acquired, delivery count 2), record 120 (acquired, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Lock timeout elapsed 111, 112 (consumer 1's records)

records 111-112 (available, delivery count 1)

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-118 (acquired, delivery count 1), record 119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "FirstOffset": 111,
      "LastOffset": 112,
      "DeliveryState": 0 (Available),
      "DeliveryCount": 1
    }
  ]
}


Acknowledge 113-118 (consumer 2)

records 113-118 acknowledged

SPSO=110, SPEO=121, record 110 (acquired, delivery count 2), records 111-112 (available, delivery count 1), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "FirstOffset": 113,
      "LastOffset": 118,
      "DeliveryState": 2 (Acknowledged),
      "DeliveryCount": 1
    }
  ]
}


Fetch records 111, 112 (consumer 3)

records 111-112 (acquired, delivery count 2)

SPSO=110, SPEO=121, record 110-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Acknowledge 110 (consumer 1)

SPSO=111

SPSO=111, SPEO=121, record 111-112 (acquired, delivery count 2), records 113-119 acknowledged, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": -1,
  "StateBatches": [
    {
      "FirstOffset": 110,
      "LastOffset": 110,
      "DeliveryState": 2 (Acknowledged),
      "DeliveryCount": 2
    }
  ]
}


Acknowledge 111, 112 (consumer 3)

SPSO=120

SPSO=120, SPEO=121, record 120 (acquired, delivery count 1)


Code Block
{
  "GroupId": "G1",
  "TopicId": topic_uuid,
  "Partition": 0,
  "StartOffset": 120,
  "StateBatches": []
}


...

OperationInvolvesNotes
Create share groupGroup coordinator

This occurs as a side-effect of the initial ShareGroupHeartbeat request.

a) The group coordinator serves the ShareGroupHeartbeat  RPC. It writes a ShareGroupMetadata record for the share group, a ShareGroupMemberMetadata record for the member, and a ShareGroupPartitionMetadata record for the share-partitions which are just about to be initialized, onto member onto the __consumer_offsets  topic. The group has now been created but the share-partitions are still being initialized. The group coordinator responds to the ShareGroupHeartbeat  RPC, but the list of assignments is empty.

b) For each share-partition being initialized, the group coordinator sends an InitializeShareGroupState  to the share coordinator. The SPSO is not known yet and is initialized to -1. The group epoch is used as the state epoch.

c) The share coordinator serves the InitializeShareGroupState  RPC. It writes a ShareSnapshot record to the __share_group_state  topic. When the record is replicated, the share coordinator responds to the RPC.

d) Back in the group coordinator, it writes an updated a ShareGroupPartitionMetadata record on the __consumer_offsets  topic for the share-partitions which are now initialized. The share-partition is partitions are now able to be included in an assignment in the share group.

Assign a share-partitionGroup coordinator and optionally share coordinatorWhen a topic-partition is assigned to a member of a share group for the first time, the group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record to the __share_group_state  topic and responds to the group coordinator. The group coordinator writes an updated ShareGroupPartitionMetadata record, and the share-partition is now able to be included in an assignment in the share group.
List share groupsGroup coordinator
Describe share groupGroup coordinator
List share group offsetsGroup coordinator and share coordinatorThe admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupOffsetsState  request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required.
Alter share group offsetsGroup coordinator and share coordinatorOnly empty share groups support this operation. The group coordinator bumps the group epoch, writes a ShareGroupMetadata, and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record with the new state epoch to the __share_group_state  topic.
Delete share group offsetsGroup coordinator and share coordinatorThis is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic to record the pending deletion of the offsets. It then sends a DeleteShareGroupState  request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes a second ShareGroupPartitionMetadata record to the __consumer_offsets  topic to complete the deletion of the offsets.
Delete share groupGroup coordinator and share coordinator

Only empty share groups can be deleted. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic to record the pending deletion of all share-group state. It then sends a DeleteShareGroupState  request to each of the share coordinators for this share group's partitions, which write tombstones to logically delete the state from the __share_group_state  topic. Then the group coordinator writes a tombstone ShareGroupPartitionMetadata and finally a tombstone ShareGroupMetadata record to the __consumer_offsets  topic.

...

They are all subclasses of RetriableException .

Broker API

...

ShareGroupPartitionAssignor

The new org.apache.kafka.coordinator.group.shareassignor.SharePartitionAssignorShareGroupPartitionAssignor  interface is an interface implemented by server-side assignors for share groups. It signifies that the partition assignor is suitable for use with share groups.

Code Block
languagejava
package org.apache.kafka.coordinator.group.shareassignor;

import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;

/**
 * Server side partition assignor used by the GroupCoordinator.
 *
 * The interface is kept in an internal module until KIP-932 is fully
 * implemented and ready to be released.
 */
@InterfaceStability.Unstable
public interface SharePartitionAssignorShareGroupPartitionAssignor {
extends PartitionAssignor   /**
     * Unique name for this assignor.
     */
    String name();

    /**
     * Assigns partitions to group members based on the given assignment specification and topic metadata.
     *
     * @param assignmentSpec           The assignment spec which includes member metadata.
     * @param subscribedTopicDescriber The topic and partition metadata describer.
     * @return The new assignment for the group.
     */
    GroupAssignment assign(
        AssignmentSpec assignmentSpec,
        SubscribedTopicDescriber subscribedTopicDescriber
    ) throws PartitionAssignorException;
}

Command-line tools

kafka-share-groups.sh

{
}

Command-line tools

kafka-share-groups.sh

A new tool called kafka-share-groups.sh is added for working with share groups. It has the following optionsA new tool called kafka-share-groups.sh is added for working with share groups. It has the following options:

Option

Description

--all-topics

Consider all topics assigned to a group in the `reset-offsets` process.

--bootstrap-server <String: server to connect to>

REQUIRED: The server(s) to connect to.

--command-config <String: command config property file>

Property file containing configs to be passed to Admin Client.

--delete

Pass in groups to delete topic partition offsets over the entire share group. For instance --group g1 --group g2

--delete-offsets

Delete offsets of share group. Supports one share group at the time, and multiple topics.

--describe

Describe share group and list offset lag (number of records not yet processed) related to given group.

--dry-run

Only show results without executing changes on share groups. Supported operations: reset-offsets.

--execute

Execute operation. Supported operations: reset-offsets.

--group <String: share group>

The share group we wish to act on.

--help

Print usage information.

--list

List all share groups.

--members

Describe members of the group. This option may be used with the '--describe' option only.

--offsets

Describe the group and list all topic partitions in the group along with their offset lag. This is the default sub-action of and may be used with the '--describe' option only.

--reset-offsets

Reset offsets of share group. Supports one share group at a time, and instances must be inactive.

--state [String]

When specified with '--describe', includes the state of the group. When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states.

--timeout <Long: timeout (ms)>

The timeout that can be set for some use cases. For example, it can be used when describing the group to specify the maximum amount of time in milliseconds to wait before the group stabilizes (when the group is just created, or is going through some changes). (default: 5000)   

--to-datetime <String: datetime>

Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'.

--to-earliest

Reset offsets to earliest offset.

--to-latest

Reset offsets to latest offset.

--topic <String: topic>

The topic whose share group information should be deleted or topic which should be included in the reset offset process.

--version

Display Kafka version.

...

This table gives the ACLs required for the new APIs.

RPCOperationResource
ShareGroupHeartbeat ReadGroup
ShareGroupDescribe DescribeGroup
ShareFetch 

Read

Read

Group

Topic

ShareAcknowledge 

Read

Read

Group

Topic

AlterShareGroupOffsets ReadGroup
DeleteShareGroupOffsets ReadGroup
DescribeShareGroupOffsets DescribeGroup
InitializeShareGroupState ClusterActionCluster
ReadShareGroupState ClusterActionCluster
WriteShareGroupState ClusterActionCluster
DeleteShareGroupState ClusterActionCluster
ReadShareGroupStateOffsets ClusterActionCluster

Error codes

This KIP adds the following error codes the Kafka protocol.

...

Code Block
{
  "type": "data",
  "name": "ShareGroupPartitionMetadataValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "InitializedTopics", "versions": "0+", "type": "[]TopicMetadata" },
     { "nameabout": "InitializingTopics", "versions": "0+", "type": "[]InitializingTopicMetadata" },The topics with initialized share-group state." }
  ],
  "commonStructs": [
    { "name": "DeletingTopicsTopicMetadata", "versions": "0+", "typefields": "[]TopicMetadata" }
  ],
  "commonStructs": [
    { "name": "TopicMetadata", "versions": "0+", "fields": [
      { "name[
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "TopicName", "type": "string", "versions": "0+",
        "about": "The topic name." },
      { "name": "NumPartitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions." }
    ]}
  ]},
}

Share-group state

These records are written by the share coordinator on the __share_group_state  topic.

ShareSnapshotKey

Code Block
{
  "type": "data",
 { "name": "InitializingTopicMetadataShareSnapshotKey",
  "versionsvalidVersions": "0+",
  "flexibleVersions": "none",
  "fields": [
      { "name": "TopicIdGroupId", "type": "uuidstring", "versions": "0+",
        "about": "The topicgroup identifierid.". },
      { "name": "TopicNameTopicId", "type": "stringuuid", "versions": "0+",
        "about": "The topic nameid." },
      { "name": "StartPartitionIndexPartition", "type": "int32", "versions": "0+",
      "The partition index." }
  ]
}

ShareSnapshotValue

Code Block
{
  "abouttype": "data"The,
 start partition index of the partitions being initialized." },
  "name": "ShareSnapshotValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "EndPartitionIndexSnapshotEpoch", "type": "int32uint16", "versions": "0+",
       "about": "The end partition index of the partitions being initializedsnapshot epoch." },
    ]}
  ]
}

The InitializingTopics  field is used as the first stage of a two-stage process to initialize the persistent state for a set of share-partitions. When partitions are being initialized, StartPartitionIndex  is the index of the first partition being initialized and EndPartitionIndex  is the index of the last partition being initialized, such as when the number of partitions for a topic is increased. Once the share coordinator successfully responds to InitializeShareGroupState  , the topic-partitions are moved into the InitializedTopics  field. A topic can of course be in both the InitializedTopics  and the InitializingTopics  field when its number of partitions has been increased.

In a similar way, the DeletingTopics  field is used as the first stage of a two-stage process to delete the persistent state for a set of share-partitions.

Share-group state

These records are written by the share coordinator on the __share_group_state  topic.

ShareSnapshotKey

Code Block
{
 { "name": "StateEpoch", "type": "dataint32",
  "nameversions": "ShareSnapshotKey0+",
      "validVersionsabout": "0",
The state "flexibleVersions": "none",
  "fields": [epoch for this share-partition." },
     { "name": "GroupIdStartOffset", "type": "stringint64", "versions": "0",
      "about": "The share-partition groupstart idoffset." },
    { "name": "TopicIdStateBatches", "type": "uuid[]StateBatch", "versions": "0",
      "aboutfields": "The topic id." },
[
      { "name": "PartitionFirstOffset", "type": "int32int64", "versions": "0",
      "The partition index." }
  ]
}

ShareSnapshotValue

Code Block
{
  "type"about": "data",
The first "name": "ShareSnapshotValue",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
offset of this state batch." },
      { "name": "SnapshotEpochLastOffset", "type": "uint16int64", "versions": "0",
        "about": "The snapshot epoch last offset of this state batch." },
      { "name": "StateEpochDeliveryState", "type": "int32int8", "versions": "0+",
        "about": "The delivery state epoch for this share-partition.- 0:Available,2:Acked,4:Archived" },
       { "name": "StartOffsetDeliveryCount", "type": "int64int16", "versions": "0",
        "about": "The share-partition start offsetdelivery count." },
    ]} 
  ]
}

ShareUpdateKey

Code Block
{
  "type": "data",
  "name": "StateBatchesShareUpdateKey",
  "typevalidVersions": "[]StateBatch1",
  "versionsflexibleVersions": "0none",
  "fields": [
      { "name": "FirstOffsetGroupId", "type": "int64string", "versions": "0",
        "about": "The first offset of this state batchgroup id." },
      { "name": "LastOffsetTopicId", "type": "int64uuid", "versions": "0",
        "about": "The last offset of this state batchtopic id." },
      { "name": "DeliveryStatePartition", "type": "int8int32", "versions": "0",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]} partition index." }
  ]
}

...

ShareUpdateValue

Code Block
{
  "type": "data",
  "name": "ShareUpdateKeyShareUpdateValue",
  "validVersions": "10",
  "flexibleVersions": "none0+",
  "fields": [
    { "name": "GroupIdSnapshotEpoch", "type": "stringuint16", "versions": "0",
      "about": "The groupsnapshot idepoch." },
    { "name": "TopicIdStartOffset", "type": "uuidint64", "versions": "0",
      "about": "The topic id share-partition start offset, or -1 if the start offset is not being updated." },
    { "name": "PartitionStateBatches", "type": "int32[]StateBatch", "versions": "0",
      "The partition index." }
  ]
}

ShareUpdateValue

Code Block
{
  "type"fields": [
      { "name": "dataFirstOffset",
  "nametype": "ShareUpdateValueint64",
  "validVersionsversions": "0",
        "flexibleVersionsabout": "0+"The first offset of this state batch." },
  "fields": [
    { "name": "SnapshotEpochLastOffset", "type": "uint16int64", "versions": "0",
        "about": "The snapshot epoch last offset of this state batch." },
      { "name": "StartOffsetDeliveryState", "type": "int64int8", "versions": "0",
        "about": "The share-partition start offset, or delivery state -1 if the start offset is not being updated." },
    { "name": "StateBatches", "type": "[]StateBatch", "versions": "0", "fields": [ 0:Available,2:Acked,4:Archived" },
      { "name": "FirstOffsetDeliveryCount", "type": "int64int16", "versions": "0",
        "about": "The first offset of this state batchdelivery count." },
    ]}  { "name": "LastOffset", "type": "int64", "versions": "0",
        "about": "The last offset of this state batch." },
      { "name": "DeliveryState", "type": "int8", "versions": "0",
        "about": "The delivery state - 0:Available,2:Acked,4:Archived" },
      { "name": "DeliveryCount", "type": "int16", "versions": "0",
        "about": "The delivery count." }
    ]} 
  ]
}

Metrics

Broker Metrics

The following new broker metrics should be added:


  ]
}

Metrics

Broker Metrics

The following new broker metrics should be added:

protocol: share

ack-type:{accept,release,reject} 

kafka.server:type=share-coordinator-metrics,name=write-latency-avg 

kafka.server:type=share-coordinator-metrics,name=write-latency-max

Metric Name

Type

Group

Tags

Description

JMX Bean

group-count

Gauge

group-coordinator-metrics

protocol: share

The total number of share groups managed by group coordinator.

kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share 

rebalance (rebalance-rate and rebalance-count)

Meter

group-coordinator-metrics

protocol: share

The total number of share group rebalances count and rate.

kafka.server:type=group-coordinator-metrics,name=rebalance-rate,protocol=share 

kafka.server:type=group-coordinator-metrics,name=rebalance-count,protocol=share 

num-partitions

Metric Name

Type

Group

Tags

Description

JMX Bean

group-count

Gauge

group-coordinator-metrics

protocol: share

The total number of share groups partitions managed by group coordinator. 

kafka.server:type=group-coordinator-metrics,name=num-partitions,protocol=share 

group-countGaugegroup-coordinator-metrics,name=group-count,protocol=share 

rebalance (rebalance-rate and rebalance-count)

Meter

group-coordinator-metrics

protocol: share

The total number of share group rebalances count and rate.

kafka.server:type=group-coordinator-metrics,name=rebalance-rate,protocol=share 

kafka.server:type=group-coordinator-metrics,name=rebalance-count,protocol=share 

num-partitions

Gauge

group-coordinator-metrics

protocol: share

The number of share partitions managed by group coordinator. 

kafka.server:type=group-coordinator-metrics,name=num-partitions,protocol=share 

group-countGaugegroup-coordinator-metrics

protocol: share

state: {empty|stable|dead} 

The number of share groups in respective state.kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={empty|stable|dead} 

share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count)

Meter

group-coordinator-metrics

protocol: share

The total number of offsets acknowledged for share groups.

kafka.server:type=group-coordinator-metrics,name=share-acknowledgement-rate,protocol=share

kafka.server:type=group-coordinator-metrics,name=share-acknowledgement-count,protocol=share 

record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count)

Meter

group-coordinator-metrics

The number of records acknowledged per acknowledgement type.

kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-rate,protocol=share,ack-type={accept,release,reject} 

kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-count,protocol=share,ack-type={accept,release,reject} 

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

group-coordinator-metrics

protocol: share 

The time taken to load the share partitions.

kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg,protocol=share 

kafka.server:type=group-coordinator-metrics,name=partition-load-time-max,protocol=share  

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-coordinator-metrics

The time taken in milliseconds to load the share-group state from the share-group state partitions.

kafka.server:type=share-coordinator-metrics,name=partition-load-time-avg 

kafka.server:type=share-coordinator-metrcs,name=partition-load-time-max 

thread-idle-ratio (thread-idle-ratio-min and thread-idle-ratio-avg)

Meter

share-coordinator-metrics

The fraction of time the share coordinator thread is idle.

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-min 

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-avg 

write (write-rate and write-total)

Meter

share-coordinator-metrics

The number of share-group state write calls per second.

kafka.server:type=share-coordinator-metrics,name=write-rate 

kafka.server:type=share-coordinator-metrics,name=write-total 

write-latency (write-latency-avg and write-latency-total)

Meter

share-coordinator-metrics

The time taken for a share-group state write call, including the time to write to the share-group state topic.

-metrics

protocol: share

state: {empty|stable|dead} 

The number of share groups in respective state.kafka.server:type=group-coordinator-metrics,name=group-count,protocol=share,state={empty|stable|dead} 

share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count)

Meter

group-coordinator-metrics

protocol: share

The total number of offsets acknowledged for share groups.

kafka.server:type=group-coordinator-metrics,name=share-acknowledgement-rate,protocol=share

kafka.server:type=group-coordinator-metrics,name=share-acknowledgement-count,protocol=share 

record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count)

Meter

group-coordinator-metrics

protocol: share

ack-type:{accept,release,reject} 


The number of records acknowledged per acknowledgement type.

kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-rate,protocol=share,ack-type={accept,release,reject} 

kafka.server:type=group-coordinator-metrics,name=record-acknowledgement-count,protocol=share,ack-type={accept,release,reject} 

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

group-coordinator-metrics

protocol: share 

The time taken to load the share partitions.

kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg,protocol=share 

kafka.server:type=group-coordinator-metrics,name=partition-load-time-max,protocol=share  

partition-load-time (partition-load-time-avg and partition-load-time-max)

Meter

share-coordinator-metrics


The time taken in milliseconds to load the share-group state from the share-group state partitions.

kafka.server:type=share-coordinator-metrics,name=partition-load-time-avg 

kafka.server:type=share-coordinator-metrcs,name=partition-load-time-max 

thread-idle-ratio (thread-idle-ratio-min and thread-idle-ratio-avg)

Meter

share-coordinator-metrics


The fraction of time the share coordinator thread is idle.

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-min 

kafka.server:type=share-coordinator-metrics,name=thread-idle-ratio-avg 

write (write-rate and write-total)

Meter

share-coordinator-metrics


The number of share-group state write calls per second.

kafka.server:type=share-coordinator-metrics,name=write-rate 

kafka.server:type=share-coordinator-metrics,name=write-total 

write-latency (write-latency-avg and write-latency-total)

Meter

share-coordinator-metrics


The time taken for a share-group state write call, including the time to write to the share-group state topic.

kafka.server:type=share-coordinator-metrics,name=write-latency-avg 

kafka.server:type=share-coordinator-metrics,name=write-latency-max 

Client metrics

The following new client metrics should be added:

Metric Name

Type

Group

Tags

Description

JMX Bean

last-poll-seconds-ago

Gauge

share-consumer-metrics

client-id 

The number of seconds since the last poll() invocation.

kafka.consumer:type=share-consumer-metrics,name=last-poll-seconds-ago,client-id=([-.\w]+) 

time-between-poll-avg

Meter

share-consumer-metrics

client-id 

The average delay between invocations of poll() in milliseconds.

kafka.consumer:type=share-consumer-metrics,name=time-between-poll-avg,client-id=([-.\w]+) 

time-between-poll-max

Meter

share-consumer-metrics

client-id 

The max delay between invocations of poll() in milliseconds.

kafka.consumer:type=share-consumer-metrics,name=last-poll-seconds-ago,client-id=([-.\w]+) 

poll-idle-ratio-avg

Meter

share-consumer-metrics

client-id 

The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records.

kafka.consumer:type=share-consumer-metrics,name=poll-idle-ratio-avg,client-id=([-.\w]+) 

heartbeat-response-time-max

Meter

share-consumer-coordinator-metrics

client-id 

The max time taken to receive a response to a heartbeat request

kafka.consumer:type=share-consumer-coordinator-metrics,name=heartbeat-response-time-max,client-id=([-.\w]+) 

heartbeat-rate

Meter

share-consumer-coordinator-metrics

client-id 

The number of heartbeats per second

kafka.consumer:type=share-consumer-coordinator-metrics,name=heartbeat-rate,client-id=([-.\w]+) 

heartbeat-total

Meter

share-consumer-coordinator-metrics

client-id 

The total number of heartbeats

kafka.consumer:type=share-consumer-coordinator-metrics,name=heartbeat-total,client-id=([-.\w]+) 

last-heartbeat-seconds-ago

Gauge

share-consumer-coordinator-metrics

client-id 

The number of seconds since the last coordinator heartbeat was sent

kafka.consumer:type=share-consumer-coordinator-metrics,name=last-heartbeat-seconds-ago,client-id=([-.\w]+) 

rebalance-latency-avg

Meter

share-consumer-coordinator-metrics

client-id 

The average time taken for a group to complete a rebalance

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-latency-avg,client-id=([-.\w]+) 

rebalance-latency-max

Meter

share-consumer-coordinator-metrics

client-id 

The max time taken for a group to complete a rebalance

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-latency-max,client-id=([-.\w]+) 

rebalance-latency-total

Meter

share-consumer-coordinator-metrics

client-id 

The total number of milliseconds spent in rebalances

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-latency-total,client-id=([-.\w]+) 

rebalance-total

Meter

share-consumer-coordinator-metrics

client-id 

The total number of rebalance events

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-total,client-id=([-.\w]+) 

rebalance-rate-per-hour

Meter

share-consumer-coordinator-metrics

client-id 

The number of rebalance events per hour

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-rate-per-hour,client-id=([-.\w]+) 

fetch-size-avg

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of bytes fetched per request

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-size-avg,client-id=([-.\w]+) 

fetch-size-max

Meter

share-consumer-fetch-manager-metrics

client-id 

The maximum number of bytes fetched per request

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-size-max,client-id=([-.\w]+) 

bytes-fetched-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of bytes fetched per second

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=bytes-fetched-rate,client-id=([-.\w]+) 

bytes-fetched-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of bytes fetched

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=bytes-fetched-total,client-id=([-.\w]+) 

records-per-request-avg

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of records in each request

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=records-per-request-avg,client-id=([-.\w]+) 

records-fetched-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of records fetched per second

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=records-fetched-rate,client-id=([-.\w]+) 

records-fetched-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of records fetched

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=records-fetched-total,client-id=([-.\w]+) 

acknowledgements-send-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of record acknowledgements sent per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=acknowledgements-send-rate,client-id=([-.\w]+) 

acknowledgements-send-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of record acknowledgements sent.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=acknowledgements-send-total,client-id=([-.\w]+) 

acknowledgements-error-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of record acknowledgements that resulted in errors per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=acknowledgements-error-rate,client-id=([-.\w]+) 

acknowledgements-error-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of record acknowledgements that resulted in errors.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=acknowledgements-error-total,client-id=([-.\w]+) 

fetch-latency-avg

Meter

share-consumer-fetch-manager-metrics

client-id 

The average time taken for a fetch request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-latency-avg,client-id=([-.\w]+) 

fetch-latency-max

Meter

share-consumer-fetch-manager-metrics

client-id 

The max time taken for any fetch request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-latency-max,client-id=([-.\w]+) 

fetch-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The number of fetch requests per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-rate,client-id=([-.\w]+) 

fetch-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of fetch requests.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-total,client-id=([-.\w]+) 

fetch-throttle-time-avg

Meter

share-consumer-fetch-manager-metrics

client-id 

The average throttle time in ms

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-throttle-time-avg,client-id=([-.\w]+) 

fetch-throttle-time-max

Meter

share-consumer-fetch-manager-metrics

client-id 

The maximum throttle time in ms

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-throttle-time-max,client-id=([-.\w]+) 

Future Work

There are some obvious extensions to this idea which are not included in this KIP in order to keep the scope manageable.

...