Current state: Accepted
Discussion thread: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Currently, the consumer reports a metric of the lag between the high watermark of a log and the consumer offset. It will be useful to report a similar lead metric between the consumer offset and the start offset of the log. If this number gets close to 0, it's an indication that the consumer may stall or lose data soon. It would be useful to know per partition lead as well.
There is no programmatic public interface change. We are only adding new metrics.
Add min lead metric across all partitions to KafkaConsumer.the metric name would be:
Also, add per-partition lead metric to KafkaConsumer. The metric name would be:
The way to get the metrics is the same as getting other per-partition metrics. For example:
TopicPartition tp = new TopicPartition("topic", 0); HashMap<String, String> tags = new HashMap<>(); tags.put("client-id", "metricTestConsumer0"); tags.put("topic", tp.topic()); tags.put("partition", String.valueOf(tp.partition()); double currentPartitionLead = kafkaConsumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)).value(); double currentPartitionMinLead = kafkaConsumer.metrics().get(new MetricName("records-lead-min", "consumer-fetch-manager-metrics", "", tags)).value(); double currentPartitionAvgLead = kafkaConsumer.metrics().get(new MetricName("records-lead-avg", "consumer-fetch-manager-metrics", "", tags)).value();
When the consumer does not own a partition anymore, all lead metrics including client-level and partition-level ones will be removed.
Compatibility, Deprecation, and Migration Plan
The change is fully backwards compatible.