Status

Current state: Accepted

Discussion thread: here

Vote thread: here

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP builds upon KIP-932, which introduced the concept of Queues/Cooperative Consumption in Kafka. KIP-932 implementation introduced Share Group, Share Partitions and Delayed Share Fetch purgatory to enable this cooperative consumption behavior. The proposed metrics in this KIP are broker metrics which will enhance the debugging and monitoring capabilities for cooperative consumption. By providing a more granular view of the underlying mechanisms, these broker metrics will aid in identifying and resolving potential issues and optimizing performance.

Public Interfaces

RationaleMetric NameTypeGroupTagsDescriptionJMX Bean
Similar to existing fetch metric TotalFetchRequestsPerSec  for Fetch, which record all topic cumulative and individual topic stats.TotalShareFetchRequestsPerSecMeterBrokerTopicMetricstopic:([-.\w]+)The fetch request rate per second.

kafka.server:type=BrokerTopicMetrics,name=TotalShareFetchRequestsPerSec

kafka.server:type=BrokerTopicMetrics,name=TotalShareFetchRequestsPerSec,topic={topic}

Similar to existing fetch metric FailedFetchRequestsPerSec  for Fetch, which record all topic cumulative and individual topic stats.FailedShareFetchRequestsPerSecMeterBrokerTopicMetricstopic:([-.\w]+)The share fetch request rate for requests that failed.

kafka.server:type=BrokerTopicMetrics,name=FailedShareFetchRequestsPerSec

kafka.server:type=BrokerTopicMetrics,name=FailedShareFetchRequestsPerSec,topic={topic}

Similar to above defined total fetch metrics, this metric tracks the number of share acknowledgement requests stats.

TotalShareAcknowledgementRequestsPerSec

Meter

BrokerTopicMetrics

topic:([-.\w]+)

The acknowledgement request rate per second.

kafka.server:type=BrokerTopicMetrics,name=TotalShareAcknowledgementRequestsPerSec

kafka.server:type=BrokerTopicMetrics,name=TotalShareAcknowledgementRequestsPerSec,topic={topic}

Similar to above defined failed fetch metrics, this metric tracks the number of failed share acknowledgement requests stats.FailedShareAcknowledgementRequestsPerSecMeterBrokerTopicMetricstopic:([-.\w]+)The share acknowledgement request rate for requests that failed

kafka.server:type=BrokerTopicMetrics,name=FailedShareAcknowledgementRequestsPerSec

kafka.server:type=BrokerTopicMetrics,name=FailedShareAcknowledgementRequestsPerSec,topic={topic}

Tracks the rate of records ackowledged per acknowledgement type.

RecordAcknowledgementsPerSec

Meter

ShareGroupMetrics

ackType:{Accept|Release|Reject}


The rate per second of records acknowledged per acknowledgement type.


kafka.server:type=ShareGroupMetrics,name=RecordAcknowledgementsPerSec,ackType={Accept|Release|Reject}



Tracks the time to load the share partition. Elevated values may indicate prolonged initialization and warrant further investigation.

PartitionLoadTimeMs

Histogram

ShareGroupMetrics


The time taken to load the share partitions.

kafka.server:type=ShareGroupMetrics,name=PartitionLoadTimeMs

In shared consumption, topic-partitions are locked before fetching data to ensure exclusive access for each share fetch request. Multiple consumers can request the same partitions, but only a subset can acquire locks.

This metric represents the ratio of successfully fetched topic-partitions to the total requested in a share fetch. A low ratio indicates that many requests are unable to fetch from a significant portion of the requested partitions.

Potential Causes for Low Ratios:

  • High Share Consumer Count: A large number of share consumers can lead to increased competition for partitions.
  • Strict Locks Limit: A low group.share.partition.max.record.locks value can exclude partition a share consumer can acquire, if reached.
  • Fewer Topic Partitions: Fewer partitions can intensify competition, especially with a high share consumer count.
  • Suboptimal Partition Assignments: Inefficient assignments can hinder efficient consumption.

This metric, in conjunction with other metrics and configuration parameters, can help optimize shared consumption by adjusting consumer counts, partition limits, and partition assignments.

Example:

If the metric reports low values and:

  • InFlightMessageCount is high: Consider increasing group.share.partition.max.record.locks to allow more locks per share partition.
  • InFlightMessageCount is not high: Consider optimizing partition assignments for cooperative consumption. If assignments are optimal and lock contention persists, increasing topic partitions or reducing the number of share consumers might improve performance.
  • If the aforementioned optimizations have been implemented, further investigation should focus on the time taken to complete log fetches and acquire records. This includes the duration for which the fetch lock is held. The FetchLockTimeMs metric provides the total time spent in this process. High values indicate potential optimization opportunities in the efficiency of fetching and acquiring messages.
RequestTopicPartitionsFetchRatioHistogramShareGroupMetricsgroup: <group_id>

The ratio of topic-partitions acquired to the total number of topic-partitions in share fetch request.

kafka.server:type=ShareGroupMetrics,name=RequestTopicPartitionsFetchRatio,group={group_id}
Tracks the time to acquire a topic partition for share fetches. High values suggest topic-partitions contention.TopicPartitionsAcquireTimeMsHistogramShareGroupMetricsgroup: <group_id>

The time elapsed (in millisecond) to acquire any topic partition for fetch.

kafka.server:type=ShareGroupMetrics,name=TopicPartitionsAcquireTimeMs,group={group_id}

Tracks the rate of acquisition locks for records which are not
acknowledged within the timeout. Timeout is set by group.share.record.lock.duration.ms.

AcquisitionLockTimeoutPerSecMeterSharePartitionMetrics

group: <group_id>

topic: <topic_name>

partition: <partition>

The rate of acquisition locks for records which are not
acknowledged within the timeout.

kafka.server:type=SharePartitionMetrics,name=AcquisitionLockTimeoutPerSec,group={group_id},topic={topic_name},partition={partition}
Defines the maximum number of in-flight messages per share partition, as set by group.share.partition.max.record.locks. A share partition cannot be acquired for fetching messages if this limit is reached.InFlightMessageCountGaugeSharePartitionMetrics

group: <group_id>

topic: <topic_name>

partition: <partition>

The number of in-flight messages for the share partition.

kafka.server:type=SharePartitionMetrics,name=InFlightMessageCount,group={group_id},topic={topic_name},partition={partition}
Specifies the number of batches currently being processed by a share partition. This metric provides insights into memory consumption and performance bottlenecks, aiding in optimization efforts.InFlightBatchCountGaugeSharePartitionMetrics

group: <group_id>

topic: <topic_name>

partition: <partition>

The number of in-flight batches for the share partition.

kafka.server:type=SharePartitionMetrics,name=InFlightBatchCount,group={group_id},topic={topic_name},partition={partition}
Specifies the number of messages in a single in-flight batch.InFlightBatchMessageCountHistogramSharePartitionMetrics

group: <group_id>

topic: <topic_name>

partition: <partition>

The number of messages in the in-flight batch.

kafka.server:type=SharePartitionMetrics,name=InFlightBatchMessageCount,group={group_id},topic={topic_name},partition={partition}
Tracks the duration of fetch lock acquisitions.FetchLockTimeMsHistogramSharePartitionMetrics

group: <group_id>

topic: <topic_name>

partition: <partition>

The time elapsed (in milliseconds) while a share partition is held under lock for fetching messages.

kafka.server:type=SharePartitionMetrics,name=FetchLockTimeMs,group={group_id},topic={topic_name},partition={partition}
Measures the proportion of time a share partition spends under lock for fetching messages. A low ratio suggests that the partition is not actively involved in fetching data.FetchLockRatioMeterSharePartitionMetrics

group: <group_id>

topic: <topic_name>

partition: <partition>

The fraction of time share partition is held under lock.

kafka.server:type=SharePartitionMetrics,name=FetchLockRatio,group={group_id},topic={topic_name},partition={partition}
Similar to IncrementalFetchSessionEvictionsPerSec ShareSessionEvictionsPerSecMeterShareSessionCache

The share session eviction rate per second.

kafka.server:type=ShareSessionCache,name=ShareSessionEvictionsPerSec
Similar to NumIncrementalFetchPartitionsCached SharePartitionsCountGaugeShareSessionCache

The number of cached share partitions.

kafka.server:type=ShareSessionCache,name=SharePartitionsCount
Similar to NumIncrementalFetchSessions ShareSessionsCountGaugeShareSessionCache

The number of cached share sessions.

kafka.server:type=ShareSessionCache,name=ShareSessionsCount
Similar to ExpiresPerSec  for other existing delayed operations i.e. DelayedFetch , etc.ExpiresPerSecMeterDelayedShareFetchMetrics

The expired delayed share fetch operation rate per second.

kafka.server:type=DelayedShareFetchMetrics,name=ExpiresPerSec
Add ShareFetch  to existing MessageConversionsPerSec metric.

MessageConversionsPerSec

(ShareFetch added to existing metric for Produce and Fetch)

MeterBrokerTopicMetricstopic:([-.\w]+)

The message format conversion rate, for Produce, Fetch or ShareFetch requests, per topic. Omitting ‘topic={…}’ will yield the all-topic rate.

kafka.server:type=BrokerTopicMetrics,name={Produce|Fetch|ShareFetch}MessageConversionsPerSec,topic=([-.\w]+)
Add ShareFetch  to existing purgatory metrics.PurgatorySize (ShareFetch)GaugeDelayedOperationPurgatorydelayedOperation : ShareFetch

The number of requests waiting in the share fetch purgatory. This is high if share consumers use a large value for fetch.wait.max.ms

kafka.server:type=DelayedOperationPurgatory,delayedOperation=ShareFetch,name=PurgatorySize
Add ShareFetch  to existing purgatory metrics.NumDelayedOperations (ShareFetch)GaugeDelayedOperationPurgatorydelayedOperation : ShareFetch

The number of delayed operations for share fetch purgatory.

kafka.server:type=DelayedOperationPurgatory,delayedOperation=ShareFetch,name=NumDelayedOperations

Move share-group-metrics  defined in KIP-932 to Yammer based metrics.

Proposed Changes

We are proposing to add the above metrics. This will give a more insightful set of metrics for those using the Cooperative Consumption.

Compatibility, Deprecation, and Migration Plan

These are new metrics and as such shouldn't have compatibility concerns.

Test Plan

Unit and integration tests.

Rejected Alternatives

  • The Meter metrics can be either Yammer or Kafka Metrics. To ensure uniformity in metric naming conventions and to align with existing Fetch and ShareFetch metrics (such as PurgatorySize), we suggest using Yammer metrics for all metrics introduced in this KIP, rather than Kafka Metrics.
  • No labels