Status
Current state: Accepted
Discussion thread: here
Vote thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP builds upon KIP-932, which introduced the concept of Queues/Cooperative Consumption in Kafka. KIP-932 implementation introduced Share Group, Share Partitions and Delayed Share Fetch purgatory to enable this cooperative consumption behavior. The proposed metrics in this KIP are broker metrics which will enhance the debugging and monitoring capabilities for cooperative consumption. By providing a more granular view of the underlying mechanisms, these broker metrics will aid in identifying and resolving potential issues and optimizing performance.
Public Interfaces
Rationale | Metric Name | Type | Group | Tags | Description | JMX Bean |
---|---|---|---|---|---|---|
Similar to existing fetch metric TotalFetchRequestsPerSec for Fetch, which record all topic cumulative and individual topic stats. | TotalShareFetchRequestsPerSec | Meter | BrokerTopicMetrics | topic:([-.\w]+) | The fetch request rate per second. | kafka.server:type=BrokerTopicMetrics,name=TotalShareFetchRequestsPerSec kafka.server:type=BrokerTopicMetrics,name=TotalShareFetchRequestsPerSec,topic={topic} |
Similar to existing fetch metric FailedFetchRequestsPerSec for Fetch, which record all topic cumulative and individual topic stats. | FailedShareFetchRequestsPerSec | Meter | BrokerTopicMetrics | topic:([-.\w]+) | The share fetch request rate for requests that failed. | kafka.server:type=BrokerTopicMetrics,name=FailedShareFetchRequestsPerSec kafka.server:type=BrokerTopicMetrics,name=FailedShareFetchRequestsPerSec,topic={topic} |
Similar to above defined total fetch metrics, this metric tracks the number of share acknowledgement requests stats. | TotalShareAcknowledgementRequestsPerSec | Meter | BrokerTopicMetrics | topic:([-.\w]+) | The acknowledgement request rate per second. | kafka.server:type=BrokerTopicMetrics,name=TotalShareAcknowledgementRequestsPerSec kafka.server:type=BrokerTopicMetrics,name=TotalShareAcknowledgementRequestsPerSec,topic={topic} |
Similar to above defined failed fetch metrics, this metric tracks the number of failed share acknowledgement requests stats. | FailedShareAcknowledgementRequestsPerSec | Meter | BrokerTopicMetrics | topic:([-.\w]+) | The share acknowledgement request rate for requests that failed | kafka.server:type=BrokerTopicMetrics,name=FailedShareAcknowledgementRequestsPerSec kafka.server:type=BrokerTopicMetrics,name=FailedShareAcknowledgementRequestsPerSec,topic={topic} |
Tracks the rate of records ackowledged per acknowledgement type. | RecordAcknowledgementsPerSec | Meter | ShareGroupMetrics |
| The rate per second of records acknowledged per acknowledgement type. | kafka.server:type=ShareGroupMetrics,name=RecordAcknowledgementsPerSec, |
Tracks the time to load the share partition. Elevated values may indicate prolonged initialization and warrant further investigation. | PartitionLoadTimeMs | Histogram | ShareGroupMetrics | The time taken to load the share partitions. |
| |
In shared consumption, topic-partitions are locked before fetching data to ensure exclusive access for each share fetch request. Multiple consumers can request the same partitions, but only a subset can acquire locks. This metric represents the ratio of successfully fetched topic-partitions to the total requested in a share fetch. A low ratio indicates that many requests are unable to fetch from a significant portion of the requested partitions. Potential Causes for Low Ratios:
This metric, in conjunction with other metrics and configuration parameters, can help optimize shared consumption by adjusting consumer counts, partition limits, and partition assignments. Example: If the metric reports low values and:
| RequestTopicPartitionsFetchRatio | Histogram | ShareGroupMetrics | group: <group_id> | The ratio of topic-partitions acquired to the total number of topic-partitions in share fetch request. | kafka.server:type=ShareGroupMetrics,name=RequestTopicPartitionsFetchRatio,group={group_id} |
Tracks the time to acquire a topic partition for share fetches. High values suggest topic-partitions contention. | TopicPartitionsAcquireTimeMs | Histogram | ShareGroupMetrics | group: <group_id> | The time elapsed (in millisecond) to acquire any topic partition for fetch. | kafka.server:type=ShareGroupMetrics,name=TopicPartitionsAcquireTimeMs,group={group_id} |
Tracks the rate of acquisition locks for records which are not | AcquisitionLockTimeoutPerSec | Meter | SharePartitionMetrics | group: <group_id> topic: <topic_name> partition: <partition> | The rate of acquisition locks for records which are not | kafka.server:type=SharePartitionMetrics,name=AcquisitionLockTimeoutPerSec,group={group_id},topic={topic_name},partition={partition} |
Defines the maximum number of in-flight messages per share partition, as set by group.share.partition.max.record.locks . A share partition cannot be acquired for fetching messages if this limit is reached. | InFlightMessageCount | Gauge | SharePartitionMetrics | group: <group_id> topic: <topic_name> partition: <partition> | The number of in-flight messages for the share partition. | kafka.server:type=SharePartitionMetrics,name=InFlightMessageCount,group={group_id},topic={topic_name},partition={partition} |
Specifies the number of batches currently being processed by a share partition. This metric provides insights into memory consumption and performance bottlenecks, aiding in optimization efforts. | InFlightBatchCount | Gauge | SharePartitionMetrics | group: <group_id> topic: <topic_name> partition: <partition> | The number of in-flight batches for the share partition. | kafka.server:type=SharePartitionMetrics,name=InFlightBatchCount,group={group_id},topic={topic_name},partition={partition} |
Specifies the number of messages in a single in-flight batch. | InFlightBatchMessageCount | Histogram | SharePartitionMetrics | group: <group_id> topic: <topic_name> partition: <partition> | The number of messages in the in-flight batch. | kafka.server:type=SharePartitionMetrics,name=InFlightBatchMessageCount,group={group_id},topic={topic_name},partition={partition} |
Tracks the duration of fetch lock acquisitions. | FetchLockTimeMs | Histogram | SharePartitionMetrics | group: <group_id> topic: <topic_name> partition: <partition> | The time elapsed (in milliseconds) while a share partition is held under lock for fetching messages. | kafka.server:type=SharePartitionMetrics,name=FetchLockTimeMs,group={group_id},topic={topic_name},partition={partition} |
Measures the proportion of time a share partition spends under lock for fetching messages. A low ratio suggests that the partition is not actively involved in fetching data. | FetchLockRatio | Meter | SharePartitionMetrics | group: <group_id> topic: <topic_name> partition: <partition> | The fraction of time share partition is held under lock. | kafka.server:type=SharePartitionMetrics,name=FetchLockRatio,group={group_id},topic={topic_name},partition={partition} |
Similar to IncrementalFetchSessionEvictionsPerSec | ShareSessionEvictionsPerSec | Meter | ShareSessionCache | The share session eviction rate per second. | kafka.server:type=ShareSessionCache,name=ShareSessionEvictionsPerSec | |
Similar to NumIncrementalFetchPartitionsCached | SharePartitionsCount | Gauge | ShareSessionCache | The number of cached share partitions. | kafka.server:type=ShareSessionCache,name=SharePartitionsCount | |
Similar to NumIncrementalFetchSessions | ShareSessionsCount | Gauge | ShareSessionCache | The number of cached share sessions. | kafka.server:type=ShareSessionCache,name=ShareSessionsCount | |
Similar to ExpiresPerSec for other existing delayed operations i.e. DelayedFetch , etc. | ExpiresPerSec | Meter | DelayedShareFetchMetrics | The expired delayed share fetch operation rate per second. | kafka.server:type=DelayedShareFetchMetrics,name=ExpiresPerSec | |
Add ShareFetch to existing MessageConversionsPerSec metric. | MessageConversionsPerSec (ShareFetch added to existing metric for Produce and Fetch) | Meter | BrokerTopicMetrics | topic:([-.\w]+) | The message format conversion rate, for Produce, Fetch or ShareFetch requests, per topic. Omitting ‘topic={…}’ will yield the all-topic rate. | kafka.server:type=BrokerTopicMetrics,name={Produce|Fetch|ShareFetch}MessageConversionsPerSec,topic=([-.\w]+) |
Add ShareFetch to existing purgatory metrics. | PurgatorySize (ShareFetch) | Gauge | DelayedOperationPurgatory | delayedOperation : ShareFetch | The number of requests waiting in the share fetch purgatory. This is high if share consumers use a large value for fetch.wait.max.ms | kafka.server:type=DelayedOperationPurgatory,delayedOperation=ShareFetch,name=PurgatorySize |
Add ShareFetch to existing purgatory metrics. | NumDelayedOperations (ShareFetch) | Gauge | DelayedOperationPurgatory | delayedOperation : ShareFetch | The number of delayed operations for share fetch purgatory. | kafka.server:type=DelayedOperationPurgatory,delayedOperation=ShareFetch,name=NumDelayedOperations |
Move share-group-metrics
defined in KIP-932 to Yammer based metrics.
Proposed Changes
We are proposing to add the above metrics. This will give a more insightful set of metrics for those using the Cooperative Consumption.
Compatibility, Deprecation, and Migration Plan
These are new metrics and as such shouldn't have compatibility concerns.
Test Plan
Unit and integration tests.
Rejected Alternatives
- The Meter metrics can be either Yammer or Kafka Metrics. To ensure uniformity in metric naming conventions and to align with existing Fetch and ShareFetch metrics (such as PurgatorySize), we suggest using Yammer metrics for all metrics introduced in this KIP, rather than Kafka Metrics.