Status

Current state: Voting in progress

Discussion thread: https://lists.apache.org/thread/z4o5093zqxmc18nzb74vyhvb1hb399v4

Vote thread: https://lists.apache.org/thread/jxf789v0kdpvsb9wsosghh57odkksxt7

JIRA: KAFKA-18088 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Kafka consumer allows users to pause()  and resume()  individual partitions. When a partition is paused, subsequent calls to poll()  will not return any records from that partition until it is resumed. This mechanism is used for application-level flow control and backpressure management.

Higher-level frameworks like Kafka Streams make extensive use of pause/resume internally:

  • Backpressure: When an internal record buffer for a partition exceeds its capacity, Kafka Streams pauses that partition to prevent further fetching. It resumes the partition once the buffer drains below the threshold.
  • Rebalance lifecycle: After a rebalance, Kafka Streams pauses all partitions that are not yet owned by fully initialized tasks. Partitions are only resumed after state store restoration completes.
  • Changelog restore management: The StoreChangelogReader pauses and resumes changelog partitions on the restore consumer depending on whether the corresponding tasks still exist.

Currently, there are no consumer metrics that expose pause/resume state. Operators and developers have no way to answer basic questions through monitoring:

  1. How many partitions are currently paused? A number of paused partitions may indicate backpressure issues, slow state restoration, or application-level processing bottlenecks.
  2. Which specific partitions are paused? Knowing which partitions are paused helps pinpoint which topics or partitions are experiencing issues.
  3. How long has a partition been paused? A partition that has been paused for an extended period may indicate a stuck consumer, an unrecoverable error in processing, or a state store restoration that is taking too long.
  4. How frequently are partitions being paused? A high rate of pause calls may indicate instability in the application's flow control logic or repeated rebalances triggering pause/resume cycles.

This KIP proposes adding new consumer metrics that track paused partitions and their pause duration.

Public Interfaces

New metrics

consumer-coordinator-metrics

The metric will have the following tags:

  • client-id
Metric NameTypeDescription
paused-partitionsGauge (Integer)The current number of partitions that have been paused by the user via Consumer.pause() .
paused-partitions-rateRateThe per-second rate of Consumer.pause()  calls.
paused-partitions-totalCumulativeCountThe total cumulative number of Consumer.pause()  calls.

consumer-fetch-manager-metrics

Each metric will have the following tags:

  • client-id
  • topic
  • partition
Metric NameTypeDescription
paused-partitionsGauge (Integer)Whether this partition is currently paused. Returns 1  if paused, 0  if not paused.
paused-partitions-duration-secondsGauge (Long)The time in seconds since this partition was paused. Returns -1  if the partition is not currently paused.
paused-partitions-rateRateThe per-second rate of Consumer.pause()  calls for this partition.
paused-partitions-totalCumulativeCountThe total cumulative number of Consumer.pause()  calls for this partition.

Note: These are per-partition metrics, consistent with existing per-partition consumer metrics such as records-lag  and records-lead . Cardinality increases linearly with the number of assigned partitions, which may increase memory usage in the metrics registry and in downstream monitoring systems.

Proposed Changes

Track Pause Timestamp

Currently, each partition's internal state tracks whether it is paused with a simple boolean flag. To support the paused-partitions-duration-seconds  metric, this KIP adds a timestamp that records when the partition was paused. The timestamp is set when pause()  is called, cleared (reset to -1) when resume() is called, and also reset to -1 on partition reassignment regardless of prior pause status. This ensures that a partition re-assigned to the same consumer after a rebalance starts with a clean pause state.

consumer-coordinator-metrics

A paused-partitions gauge and paused-partitions-rate / paused-partitions-total sensors are registered at coordinator level. When queried, this gauge computes the current count of paused partitions by reading from the consumer's subscription state. Sensor metrics are updated on each pause() call, incrementing the cumulative count and contributing to the rate calculation.

consumer-fetch-manager-metrics

The paused-partitions / paused-partitions-duration-seconds gauges and paused-partitions-rate / paused-partitions-total sensors are registered per partition in the fetch manager metrics, alongside the existing per-partition lag and lead metrics. They follow the same lifecycle:

  • On partition assignment: The metrics are registered for each newly assigned partition.
  • On partition revocation: The metrics are removed for each revoked partition, preventing stale metrics from accumulating. Gauge metrics are implemented as on-demand that read directly from the partition's pause state when queried, rather than being updated on every pause/resume call.

Compatibility, Deprecation, and Migration Plan

This change only adds new metrics. No existing metrics or APIs are deprecated.

Test Plan

  • Verify that the pause timestamp is set when pause()  is called, cleared on resume() , and reset on partition reassignment.
  • Verify that the coordinator-level metrics are registered and returns the correct count as partitions are paused and resumed.
  • Verify that per-partition metrics are registered, return correct values, and are removed when partitions are revoked.

Rejected Alternatives

N/A

  • No labels