Status
Current state: Accepted
Discussion thread: here
JIRA: - KAFKA-16143Getting issue details... STATUS
Motivation
The new consumer group rebalance protocol introduced in KIP-848 gave rise to a new consumer implementation to support the new protocol. The new protocol is referred to as CONSUMER, whereas the old protocol is called CLASSIC. Similarly, the new consumer implementation, AsyncKafkaConsumer, uses the CONSUMER protocol, while the older LegacyKafkaConsumer uses the CLASSIC protocol. If using the CLASSIC protocol, the proposed metrics will not exist. The metrics will only be added if the CONSUMER protocol is being used. This is because these metrics are not applicable when using the CLASSIC protocol. In other words, the proposed metrics are only for the new consumer using the CONSUMER rebalance protocol.
The new AsyncKafkaConsumer that uses the new CONSUMER rebalance protocol has some major design differences to the old consumer implementation. Namely, the threading model that is being used. The new consumer uses two threads; one thread for handling API calls and the other is a background thread that handles network IO with the broker and the request/response handling. The addition of the proposed metrics would assist in debugging, providing a more insightful view of CONSUMER, and having a more holistic set of metrics for the consumer.
Proposed Changes
We are proposing to add the below metrics to the consumer. These metrics will cover the new consumer components introduced in AsyncKafkaConsumer. This will give a more insightful set of metrics for those using the CONSUMER protocol.
Public Interfaces
New Metrics
Metric Name | Telemetry Metric Name | Data Type | Description | MBean |
---|---|---|---|---|
time-between-network-thread-poll-max | org.apache.kafka.consumer.time.between.network.thread.poll.max | long | The maximum time taken, in milliseconds, between each poll in the network thread | kafka.consumer:type=consumer-metrics,client-id={clientId} |
time-between-network-thread-poll-avg | org.apache.kafka.consumer.time.between.network.thread.poll.avg | long | The average time taken, in milliseconds, between each poll in the network thread | kafka.consumer:type=consumer-metrics,client-id={clientId} |
application-event-queue-size | org.apache.kafka.consumer.application.event.queue.size | int | The current number of events in the consumer network application event queue | kafka.consumer:type=consumer-metrics,client-id={clientId} |
application-event-queue-time-avg | org.apache.kafka.consumer.application.event.queue.time.avg | long | The average time, in milliseconds, that application events are taking to be dequeued | kafka.consumer:type=consumer-metrics,client-id={clientId} |
application-event-queue-time-max | org.apache.kafka.consumer.application.event.queue.time.max | long | The maximum time, in milliseconds, that an application event took to be dequeued | kafka.consumer:type=consumer-metrics,client-id={clientId} |
application-event-queue-processing-time-avg | org.apache.kafka.consumer.application.event.queue.processing.time.avg | long | The average time, in milliseconds, that the consumer network takes to process all available application events | kafka.consumer:type=consumer-metrics,client-id={clientId} |
application-event-queue-processing-time-max | org.apache.kafka.consumer.application.event.queue.processing.time.max | long | The maximum time, in milliseconds, that the consumer network took to process all available application events | kafka.consumer:type=consumer-metrics,client-id={clientId} |
unsent-requests-queue-size | org.apache.kafka.consumer.unsent.requests.queue.size | int | The current number of unsent requests in the consumer network | kafka.consumer:type=consumer-metrics,client-id={clientId} |
unsent-requests-queue-time-max | org.apache.kafka.consumer.unsent.requests.queue.time.max | long | The maximum time, in milliseconds, that a request remained unsent in the consumer network | kafka.consumer:type=consumer-metrics,client-id={clientId} |
unsent-requests-queue-time-avg | org.apache.kafka.consumer.unsent.requests.queue.time.avg | long | The average time, in milliseconds, that requests are taking to be sent in the consumer network | kafka.consumer:type=consumer-metrics,client-id={clientId} |
background-event-queue-size | org.apache.kafka.consumer.background.event.queue.size | int | The current number of events in the consumer background event queue | kafka.consumer:type=consumer-metrics,client-id={clientId} |
background-event-queue-time-avg | org.apache.kafka.consumer.background.event.queue.time.avg | long | The average time, in milliseconds, that background events are taking to be dequeued | kafka.consumer:type=consumer-metrics,client-id={clientId} |
background-event-queue-time-max | org.apache.kafka.consumer.background.event.queue.time.max | long | The maximum time, in milliseconds, that background events are taking to be dequeued | kafka.consumer:type=consumer-metrics,client-id={clientId} |
background-event-queue-processing-time-avg | org.apache.kafka.consumer.background.event.queue.processing.time.avg | long | The average time, in milliseconds, that the consumer took to process all available background events | kafka.consumer:type=consumer-metrics,client-id={clientId} |
background-event-queue-processing-time-max | org.apache.kafka.consumer.background.event.queue.processing.time.max | long | The maximum time, in milliseconds, that the consumer took to process all available background events | kafka.consumer:type=consumer-metrics,client-id={clientId} |
application-events-expired-size | org.apache.kafka.consumer.application.event.expired.size | long | The current number of expired application events | kafka.consumer:type=consumer-metrics,client-id={clientId} |
Compatibility, Deprecation, and Migration Plan
Since only new metrics are being proposed, there should be no compatibility issues.
Test Plan
Tests will be written for each of the new metrics managers to ensure measurement are correctly taken.