Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Remove rebalance latency, rename ReadShareGroupOffsetsState RPC

...

The following diagram illustrates how these pieces are wired together using the new Kafka protocol RPCs introduced by this KIP.

Image RemovedImage Added

Relationship with consumer groups

...

This KIP introduces org.apache.kafka.coordinator.group.share.SimpleAssignor .  It It attempts to balance the number of consumers assigned to the subscribed partitions, and assumes that all consumers are equally capable of consuming records. When calculating a new target assignment, the assignor is aware of the current assignment (assuming the group coordinator hasn't changed) and can use this to influence the calculation of the new target assignment. The calculation proceeds as follows:

  1. The assignor hashes the member IDs of the members and maps the partitions assigned to the members based on the hash. This gives approximately even balance.

  2. If any partitions were not assigned any members by (1) and do not have members already assigned in the current assignment, members are assigned round-robin until each partition has at least one member assigned to it.

  3. If any partitions were assigned members by (1) and also have members in the current assignment assigned by (2), the members assigned by (2) are removed.

When the number of members is greater than or equal to the number of partitions for a subscribed topic and every partition was assigned at least one member, adding more members to the group simply assigns more members to the partitions. If however, any partitions were not assigned any members by hashing, adding more members changes the distribution and will revoke some partitions previously assigned. Because assignments in share groups are looser than they are in consumer groups, the group coordinator doesn't require the members to confirm they have revoked partitions before they can be assigned to other members. For a brief period while the rebalancing is occurring, some partitions may have more members consuming than the assignment requires, but this situation soon resolves as the members receive their updated assignments and stop fetching from the revoked partitions.

...

The share-partition leader must be aware of when the group coordinator is being used to alter the SPSO with a KafkaAdmin.alterShareGroupOffsets request. This only occurs when the group is empty. As a result, when the set of share sessions transitions from 0 to 1, the share-partition leader uses the ReadShareGroupOffsetsState RPC ReadShareGroupStateSummary RPC to validate its state epoch (this request is much cheaper for the share coordinator to handle than ReadShareGroupState ). We know that there are no acquired records, so re-initializing the share-partition leader is non-disruptive. If the state epoch has changed, the share-partition leader issues a ReadShareGroupState RPC to the share coordinator and uses the response to re-initialize.  

...

Several components work together to create share groups. The group coordinator is responsible for assignment, membership and the state of the group. The share-partition leaders are responsible for delivery and acknowledgement. The share coordinator is responsible for share-group persistent state.

Image RemovedImage Added

The following table explains how the administration operations on share groups work.

OperationInvolvesNotes
Create share groupGroup coordinator

This occurs as a side-effect of the initial ShareGroupHeartbeat request.

a) The group coordinator serves the ShareGroupHeartbeat  RPC. It writes a ShareGroupMetadata record for the share group, a ShareGroupMemberMetadata record for the member onto the __consumer_offsets  topic. The group has now been created but the share-partitions are still being initialized. The group coordinator responds to the ShareGroupHeartbeat  RPC, but the list of assignments is empty.

b) For each share-partition being initialized, the group coordinator sends an InitializeShareGroupState  to the share coordinator. The SPSO is not known yet and is initialized to -1. The group epoch is used as the state epoch.

c) The share coordinator serves the InitializeShareGroupState  RPC. It writes a ShareSnapshot record to the __share_group_state  topic. When the record is replicated, the share coordinator responds to the RPC.

d) Back in the group coordinator, it writes a ShareGroupPartitionMetadata record on the __consumer_offsets  topic for the share-partitions which are now initialized. The share-partitions are now able to be included in an assignment in the share group.

Assign a share-partitionGroup coordinator and optionally share coordinatorWhen a topic-partition is assigned to a member of a share group for the first time, the group coordinator sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record to the __share_group_state  topic and responds to the group coordinator. The group coordinator writes an updated ShareGroupPartitionMetadata record, and the share-partition is now able to be included in an assignment in the share group.
List share groupsGroup coordinator
Describe share groupGroup coordinator
List share group offsetsGroup coordinator and share coordinatorThe admin client gets a list of share-partitions from the group coordinator. It then asks the group coordinator to request the SPSO of the share-partitions from the share coordinator using a ReadShareGroupOffsetsState ReadShareGroupStateSummary  request. Although the share-partition leader also knows this information, the share coordinator provides it here because when a share-partition is not used for a while, the share-partition leader frees up the memory, reloading it from the share-coordinator when it is next required.
Alter share group offsetsGroup coordinator and share coordinatorOnly empty share groups support this operation. The group coordinator bumps the group epoch, writes a ShareGroupMetadata, and sends an InitializeShareGroupState  request to the share coordinator. The share coordinator writes a ShareSnapshot record with the new state epoch to the __share_group_state  topic. If the partition was not previously initialized, the group coordinator writes an updated ShareGroupPartitionMetadata record.
Delete share group offsetsGroup coordinator and share coordinatorThis is administrative removal of a topic from a share group. Only empty share groups support this operation. The group coordinator writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic adding the topic to the DeletingTopics  array and removing it from the InitializedTopics  array. This enables an interrupted deletion to be completed. The group coordinator sends a DeleteShareGroupState  request to the share coordinator which writes tombstones to logically delete the state. Then the group coordinator writes an updated ShareGroupPartitionMetadata record to the __consumer_offsets  topic to complete the deletion of the topic from the share group.
Delete share groupGroup coordinator and share coordinator

Only empty share groups can be deleted. If the share group has any topics with initialized state, it writes a ShareGroupPartitionMetadata record to the __consumer_offsets  topic moving all initialized topics into the DeletingTopics  array, and then it sends a DeleteShareGroupState  request to each of the share coordinators for this share group's partitions, which write tombstones to logically delete the state from the __share_group_state  topic. Then, the group coordinator writes a tombstone ShareGroupPartitionMetadata and finally a tombstone ShareGroupMetadata record to the __consumer_offsets  topic.

...

  • ShareGroupHeartbeat - for consumers to form and maintain share groups
  • ShareGroupDescribe - for describing share groups
  • ShareFetch - for fetching records from share-partition leaders
  • ShareAcknowledge - for acknowledging delivery of records with share-partition leaders
  • AlterShareGroupOffsets - for altering the share-partition start offsets for the share-partitions in a share group
  • DeleteShareGroupOffsets - for deleting the offsets for the share-partitions in a share group
  • DescribeShareGroupOffsets - for describing the offsets for the share-partitions in a share group
  • InitializeShareGroupState  - for initializing share-partition state on a share-coordinator
  • ReadShareGroupState - for reading share-partition state from a share coordinator

  • WriteShareGroupState - for writing share-partition state to a share coordinator

  • DeleteShareGroupState - for deleting share-partition state from a share coordinator

  • ReadShareGroupOffsetsStateReadShareGroupStateSummary  - for reading a summary of the offsets from the share-partition state from a share coordinator

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "DeleteShareGroupStateResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_STATE_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]DeleteStateResult", "versions": "0+",
      "about": "The delete results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
      ]}
    ]}
  ]
}

...

ReadShareGroupStateSummary API

The ReadShareGroupOffsetsState ReadShareGroupStateSummary API is used by the group coordinator to read a summary of the offset information from share-partition state from a share coordinator. This is an inter-broker RPC authorized as a cluster action.

...

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ReadShareGroupOffsetsStateRequestReadShareGroupStateSummaryRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+",
      "about":"The group identifier." },
    { "name": "Topics", "type": "[]ReadOffsetsStateDataReadStateSummaryData", "versions": "0+",
      "about": "The data for the topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier." },
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about":  "The data for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The leader epoch of the share-partition." }
      ]}
    ]}
  ]
}

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ReadShareGroupOffsetsStateResponseReadShareGroupStateSummaryResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - NOT_COORDINATOR (version 0+)  
  // - COORDINATOR_NOT_AVAILABLE (version 0+)
  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
  // - GROUP_ID_NOT_FOUND (version 0+)
  // - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
  // - FENCED_LEADER_EPOCH (version 0+)
  // - INVALID_REQUEST (version 0+)
  "fields": [
    { "name": "Results", "type": "[]ReadOffsetsStateResultReadStateSummaryResult", "versions": "0+",
      "about": "The read results", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic identifier" },
      { "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
        "about" : "The results for the partitions.", "fields": [
        { "name": "Partition", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no error." },
        { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The error message, or null if there was no error." }
        { "name": "StateEpoch", "type": "int32", "versions": "0+",
          "about": "The state epoch of the share-partition." },
        { "name": "StartOffset", "type": "int64", "versions": "0+",
          "about": "The share-partition start offset." }
      ]}
    ]}
  ]
}

...

Metric Name

Type

Group

Tags

Description

JMX Bean

last-poll-seconds-ago

Gauge

share-consumer-metrics

client-id 

The number of seconds since the last poll() invocation.

kafka.consumer:type=share-consumer-metrics,name=last-poll-seconds-ago,client-id=([-.\w]+) 

time-between-poll-avg

Meter

share-consumer-metrics

client-id 

The average delay between invocations of poll() in milliseconds.

kafka.consumer:type=share-consumer-metrics,name=time-between-poll-avg,client-id=([-.\w]+) 

time-between-poll-max

Meter

share-consumer-metrics

client-id 

The maximum delay between invocations of poll() in milliseconds.

kafka.consumer:type=share-consumer-metrics,name=last-poll-seconds-ago,client-id=([-.\w]+) 

poll-idle-ratio-avg

Meter

share-consumer-metrics

client-id 

The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records.

kafka.consumer:type=share-consumer-metrics,name=poll-idle-ratio-avg,client-id=([-.\w]+) 

heartbeat-response-time-max

Meter

share-consumer-coordinator-metrics

client-id 

The maximum time taken to receive a response to a heartbeat request in milliseconds.

kafka.consumer:type=share-consumer-coordinator-metrics,name=heartbeat-response-time-max,client-id=([-.\w]+) 

heartbeat-rate

Meter

share-consumer-coordinator-metrics

client-id 

The number of heartbeats per second.

kafka.consumer:type=share-consumer-coordinator-metrics,name=heartbeat-rate,client-id=([-.\w]+) 

heartbeat-total

Meter

share-consumer-coordinator-metrics

client-id 

The total number of heartbeats.

kafka.consumer:type=share-consumer-coordinator-metrics,name=heartbeat-total,client-id=([-.\w]+) 

last-heartbeat-seconds-ago

Gauge

share-consumer-coordinator-metrics

client-id 

The number of seconds since the last coordinator heartbeat was sent.

rebalance-latency-total

Meter

share-consumer-coordinator-metrics

client-id 

The total number of milliseconds spent in rebalances.

kafka.consumer:type=share-consumer-coordinator-metrics,name=last-heartbeat-seconds-ago,client-id=([-.\w]+) 

rebalance-latency-avg

Meter

share-consumer-coordinator-metrics

client-id 

The average time taken for a group to complete a rebalance in milliseconds.

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-latency-avg,client-id=([-.\w]+) 

rebalance-latency-max

Meter

share-consumer-coordinator-metrics

client-id 

The maximum time taken for a group to complete a rebalance in milliseconds.

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-latency-max,client-id=([-.\w]+) 

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-latency-total,client-id=([-.\w]+) 

rebalance-total

Meter

share-consumer-coordinator-metrics

client-id 

The total number of rebalance events.

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-total,client-id=([-.\w]+) 

rebalance-rate-per-hour

Meter

share-consumer-coordinator-metrics

client-id 

The number of rebalance events per hour.

kafka.consumer:type=share-consumer-coordinator-metrics,name=rebalance-rate-per-hour,client-id=([-.\w]+) 

fetch-size-avg

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of bytes fetched per request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-size-avg,client-id=([-.\w]+) 

fetch-size-max

Meter

share-consumer-fetch-manager-metrics

client-id 

The maximum number of bytes fetched per request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-size-max,client-id=([-.\w]+) 

bytes-fetched-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of bytes fetched per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=bytes-fetched-rate,client-id=([-.\w]+) 

bytes-fetched-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of bytes fetched.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=bytes-fetched-total,client-id=([-.\w]+) 

records-per-request-avg

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of records in each request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=records-per-request-avg,client-id=([-.\w]+) 

records-per-request-max

Meter

share-consumer-fetch-manager-metrics

client-id 

The maximum number of records in a request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=records-per-request-max,client-id=([-.\w]+) 

records-fetched-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of records fetched per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=records-fetched-rate,client-id=([-.\w]+) 

records-fetched-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of records fetched.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=records-fetched-total,client-id=([-.\w]+) 

acknowledgements-send-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of record acknowledgements sent per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=acknowledgements-send-rate,client-id=([-.\w]+) 

acknowledgements-send-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of record acknowledgements sent.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=acknowledgements-send-total,client-id=([-.\w]+) 

acknowledgements-error-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The average number of record acknowledgements that resulted in errors per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=acknowledgements-error-rate,client-id=([-.\w]+) 

acknowledgements-error-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of record acknowledgements that resulted in errors.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=acknowledgements-error-total,client-id=([-.\w]+) 

fetch-latency-avg

Meter

share-consumer-fetch-manager-metrics

client-id 

The average time taken for a fetch request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-latency-avg,client-id=([-.\w]+) 

fetch-latency-max

Meter

share-consumer-fetch-manager-metrics

client-id 

The maximum time taken for any fetch request.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-latency-max,client-id=([-.\w]+) 

fetch-rate

Meter

share-consumer-fetch-manager-metrics

client-id 

The number of fetch requests per second.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-rate,client-id=([-.\w]+) 

fetch-total

Meter

share-consumer-fetch-manager-metrics

client-id 

The total number of fetch requests.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-total,client-id=([-.\w]+) 

fetch-throttle-time-avg

Meter

share-consumer-fetch-manager-metrics

client-id 

The average throttle time in milliseconds.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-throttle-time-avg,client-id=([-.\w]+) 

fetch-throttle-time-max

Meter

share-consumer-fetch-manager-metrics

client-id 

The maximum throttle time in milliseconds.

kafka.consumer:type=share-consumer-fetch-manager-metrics,name=fetch-throttle-time-max,client-id=([-.\w]+) 

...