Status

Current state: Under Discussion

Discussion thread: Link

JIRA: KAFKA-20386 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Kafka client has good metrics for network I/O, fetch behaviour, and group coordination, but the clientside processing pipeline has blind spots:

  1. Serialization/deserialization has no timing or per-key/value size metrics. The producer reports record-size-avg/max for total record size, but there's no way to see key vs. value size independently or measure how long serialize()/deserialize() takes. When a schema change turns a 200us deserialization into 2ms, the regression is invisible where teams chase broker and network metrics before discovering their own serde is the bottleneck. Custom SerDes calling schema registries or decryption services can dominate processing time with no visibility.

  2. Interceptors execute in the hot path with no metrics. ProducerInterceptor.onSend() and ConsumerInterceptor.onConsume() run on every record, but there are no timing or error metrics. A slow interceptor is indistinguishable from a slow broker. The client already catches and suppresses interceptor exceptions. Adding timing and error counts around those call sites is minimal effort.

Proposed Changes

Add client-side metrics using the existing Metrics/Sensor infrastructure. Timing metrics use metrics.recording.level=DEBUG to avoid nanoTime() overhead at default levels. Size and error metrics are INFO (always on).

Producer Serialization Metrics

Group: producer-metrics

Metric NameTypeLevelTagsDescription
serialization-time-ns-avgAvgDEBUGserializer, typeAvg nanoseconds for Serializer.serialize()
serialization-time-ns-maxMaxDEBUGserializer, typeMax nanoseconds for Serializer.serialize()
serialized-size-bytes-avgAvgINFOserializer, typeAvg size of byte[] returned by serialize()
serialized-size-bytes-maxMaxINFOserializer, typeMax size of byte[] returned by serialize()
serialization-error-rateRateINFOserializer, typeRate of SerializationException
serialization-error-totalCumulativeCountINFOserializer, typeTotal SerializationException count

Tags: serializer = simple class name (e.g. StringSerializer), type = key or value. All serde and interceptor metrics also carry the standard client-id tag already present on all client metrics, which distinguishes producers/consumers using the same serializer class against different backends.

Differs from existing record-size-avg: that metric measures total record size (headers + key + value). The new metric measures key or value bytes independently.

Tag cardinality: Bounded by the number of distinct serializer class names configured on the client (typically 1-2) and is static for the producer's lifetime. Two producers using KafkaAvroSerializer with different schema registries report under the same serializer tag but are distinguished by client-id.

Null values (tombstones): When serialize() returns null (tombstone), no size is recorded for that call. This avoids skewing the average with zero-byte entries that don't represent real serialization work.

Scope: These metrics cover key and value serialization only. Header serialization/deserialization is out of scope for this KIP.

Consumer Deserialization Metrics

Group: consumer-metrics

Metric NameTypeLevelTagsDescription
deserialization-time-ns-avgAvgDEBUGdeserializer, typeAvg nanoseconds for Deserializer.deserialize()
deserialization-time-ns-maxMaxDEBUGdeserializer, typeMax nanoseconds for Deserializer.deserialize()
deserialized-size-bytes-avgAvgINFOdeserializer, typeAvg size of input byte[] to deserialize()
deserialized-size-bytes-maxMaxINFOdeserializer, typeMax size of input byte[] to deserialize()
deserialization-error-rateRateINFOdeserializer, typeRate of deserialization errors
deserialization-error-totalCumulativeCountINFOdeserializer, typeTotal deserialization error count

There is currently no per-record size metric on the consumer side. This fills that gap. NOT measured: Header serialization/deserialization (future work), time spent in user code between poll() and iteration

Producer Interceptor Metrics

Group: producer-metrics

Metric NameTypeLevelTagsDescription
interceptor-onSend-time-ns-avgAvgDEBUGinterceptorAvg nanoseconds for onSend()
interceptor-onSend-time-ns-maxMaxDEBUGinterceptorMax nanoseconds for onSend()
interceptor-onSend-error-rateRateINFOinterceptorRate of exceptions in onSend()
interceptor-onSend-error-totalCumulativeCountINFOinterceptorTotal exceptions in onSend()
interceptor-onAcknowledgement-time-ns-avgAvgDEBUGinterceptorAvg nanoseconds for onAcknowledgement()
interceptor-onAcknowledgement-time-ns-maxMaxDEBUGinterceptorMax nanoseconds for onAcknowledgement()
interceptor-onAcknowledgement-error-rateRateINFOinterceptorRate of exceptions in onAcknowledgement()
interceptor-onAcknowledgement-error-totalCumulativeCountINFOinterceptorTotal exceptions in onAcknowledgement()

Consumer Interceptor Metrics

Group: consumer-metrics

Metric NameTypeLevelTagsDescription
interceptor-onConsume-time-ns-avgAvgDEBUGinterceptorAvg nanoseconds for onConsume()
interceptor-onConsume-time-ns-maxMaxDEBUGinterceptorMax nanoseconds for onConsume()
interceptor-onConsume-error-rateRateINFOinterceptorRate of exceptions in onConsume()
interceptor-onConsume-error-totalCumulativeCountINFOinterceptorTotal exceptions in onConsume()
interceptor-onCommit-time-ns-avgAvgDEBUGinterceptorAvg nanoseconds for onCommit()
interceptor-onCommit-time-ns-maxMaxDEBUGinterceptorMax nanoseconds for onCommit()
interceptor-onCommit-error-rateRateINFOinterceptorRate of exceptions in onCommit()
interceptor-onCommit-error-totalCumulativeCountINFOinterceptorTotal exceptions in onCommit()


Tag: interceptor = simple class name. One set of metrics per interceptor class. If interceptor.classes lists three classes, three metric instances are created. 

Cardinality is bounded by the number of configured interceptors (typically 1-3).

Similar to SerDe, use of client-id for distinguishing interceptors across different producers and consumers.

Metric Placement

  • Serialization timing/size: around keySerializer.serialize() and valueSerializer.serialize() in KafkaProducer.doSend()
  • Deserialization timing/size: around keyDeserializer.deserialize() and valueDeserializer.deserialize() in CompletedFetch.fetchRecords()
  • Interceptor timing: inside ProducerInterceptors/ConsumerInterceptors around each interceptor's method call (per interceptor class)

JMX

Standard MBean naming:

kafka.producer:type=producer-metrics,client-id={id},serializer={class},type={key|value}
kafka.producer:type=producer-metrics,client-id={id},interceptor={class}
kafka.consumer:type=consumer-metrics,client-id={id},deserializer={class},type={key|value}
kafka.consumer:type=consumer-metrics,client-id={id},interceptor={class}
...rest

New metrics have distinct tag sets and do not collide with existing MBeans.

Configuration

No new configuration. Timing metrics are gated by the existing metrics.recording.level:

  • INFO (default): size, error, and gauge metrics active. No nanoTime() overhead.
  • DEBUG: timing metrics also active.

Compatibility, Deprecation, and Migration Plan

Fully backwards compatible. New metrics added only.

Test Plan

  • Unit tests for serde timing and size accuracy (producer and consumer)
  • Unit tests for serde error counting on SerializationException
  • Unit tests for null value (tombstone) handling -- no size recorded
  • Unit tests for interceptor timing and error counting per method (producer and consumer)
  • Unit tests for chained interceptors producing independent metric instances
  • Integration tests for JMX MBean names and tag values including client-id
  • Integration tests with custom Serializer/Deserializer and chained interceptors

Rejected Alternatives

Serde metrics as a pluggable reporter/decorator. Pushes instrumentation burden to every team. The client controls the call site whereas measuring there is zero-cost for users by exposing these metrics.

Per-topic serde/interceptor metrics. Adding a topic tag would create unbounded cardinality for producers/consumers handling many topics. Per-client aggregation covers the common case (identifying that serde is slow); per-topic breakdown could be a follow-up if demand warrants it.


  • No labels