DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: Link
JIRA:
KAFKA-20386
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The Kafka client has good metrics for network I/O, fetch behaviour, and group coordination, but the clientside processing pipeline has blind spots:
Serialization/deserialization has no timing or per-key/value size metrics. The producer reports
record-size-avg/maxfor total record size, but there's no way to see key vs. value size independently or measure how longserialize()/deserialize()takes. When a schema change turns a 200us deserialization into 2ms, the regression is invisible where teams chase broker and network metrics before discovering their own serde is the bottleneck. Custom SerDes calling schema registries or decryption services can dominate processing time with no visibility.Interceptors execute in the hot path with no metrics.
ProducerInterceptor.onSend()andConsumerInterceptor.onConsume()run on every record, but there are no timing or error metrics. A slow interceptor is indistinguishable from a slow broker. The client already catches and suppresses interceptor exceptions. Adding timing and error counts around those call sites is minimal effort.
Proposed Changes
Add client-side metrics using the existing Metrics/Sensor infrastructure. Timing metrics use metrics.recording.level=DEBUG to avoid nanoTime() overhead at default levels. Size and error metrics are INFO (always on).
Producer Serialization Metrics
Group: producer-metrics
| Metric Name | Type | Level | Tags | Description |
|---|---|---|---|---|
serialization-time-ns-avg | Avg | DEBUG | serializer, type | Avg nanoseconds for Serializer.serialize() |
serialization-time-ns-max | Max | DEBUG | serializer, type | Max nanoseconds for Serializer.serialize() |
serialized-size-bytes-avg | Avg | INFO | serializer, type | Avg size of byte[] returned by serialize() |
serialized-size-bytes-max | Max | INFO | serializer, type | Max size of byte[] returned by serialize() |
serialization-error-rate | Rate | INFO | serializer, type | Rate of SerializationException |
serialization-error-total | CumulativeCount | INFO | serializer, type | Total SerializationException count |
Tags: serializer = simple class name (e.g. StringSerializer), type = key or value. All serde and interceptor metrics also carry the standard client-id tag already present on all client metrics, which distinguishes producers/consumers using the same serializer class against different backends.
Differs from existing record-size-avg: that metric measures total record size (headers + key + value). The new metric measures key or value bytes independently.
Tag cardinality: Bounded by the number of distinct serializer class names configured on the client (typically 1-2) and is static for the producer's lifetime. Two producers using KafkaAvroSerializer with different schema registries report under the same serializer tag but are distinguished by client-id.
Null values (tombstones): When serialize() returns null (tombstone), no size is recorded for that call. This avoids skewing the average with zero-byte entries that don't represent real serialization work.
Scope: These metrics cover key and value serialization only. Header serialization/deserialization is out of scope for this KIP.
Consumer Deserialization Metrics
Group: consumer-metrics
| Metric Name | Type | Level | Tags | Description |
|---|---|---|---|---|
deserialization-time-ns-avg | Avg | DEBUG | deserializer, type | Avg nanoseconds for Deserializer.deserialize() |
deserialization-time-ns-max | Max | DEBUG | deserializer, type | Max nanoseconds for Deserializer.deserialize() |
deserialized-size-bytes-avg | Avg | INFO | deserializer, type | Avg size of input byte[] to deserialize() |
deserialized-size-bytes-max | Max | INFO | deserializer, type | Max size of input byte[] to deserialize() |
deserialization-error-rate | Rate | INFO | deserializer, type | Rate of deserialization errors |
deserialization-error-total | CumulativeCount | INFO | deserializer, type | Total deserialization error count |
There is currently no per-record size metric on the consumer side. This fills that gap. NOT measured: Header serialization/deserialization (future work), time spent in user code between poll() and iteration
Producer Interceptor Metrics
Group: producer-metrics
| Metric Name | Type | Level | Tags | Description |
|---|---|---|---|---|
interceptor-onSend-time-ns-avg | Avg | DEBUG | interceptor | Avg nanoseconds for onSend() |
interceptor-onSend-time-ns-max | Max | DEBUG | interceptor | Max nanoseconds for onSend() |
interceptor-onSend-error-rate | Rate | INFO | interceptor | Rate of exceptions in onSend() |
interceptor-onSend-error-total | CumulativeCount | INFO | interceptor | Total exceptions in onSend() |
interceptor-onAcknowledgement-time-ns-avg | Avg | DEBUG | interceptor | Avg nanoseconds for onAcknowledgement() |
interceptor-onAcknowledgement-time-ns-max | Max | DEBUG | interceptor | Max nanoseconds for onAcknowledgement() |
interceptor-onAcknowledgement-error-rate | Rate | INFO | interceptor | Rate of exceptions in onAcknowledgement() |
interceptor-onAcknowledgement-error-total | CumulativeCount | INFO | interceptor | Total exceptions in onAcknowledgement() |
Consumer Interceptor Metrics
Group: consumer-metrics
| Metric Name | Type | Level | Tags | Description |
|---|---|---|---|---|
interceptor-onConsume-time-ns-avg | Avg | DEBUG | interceptor | Avg nanoseconds for onConsume() |
interceptor-onConsume-time-ns-max | Max | DEBUG | interceptor | Max nanoseconds for onConsume() |
interceptor-onConsume-error-rate | Rate | INFO | interceptor | Rate of exceptions in onConsume() |
interceptor-onConsume-error-total | CumulativeCount | INFO | interceptor | Total exceptions in onConsume() |
interceptor-onCommit-time-ns-avg | Avg | DEBUG | interceptor | Avg nanoseconds for onCommit() |
interceptor-onCommit-time-ns-max | Max | DEBUG | interceptor | Max nanoseconds for onCommit() |
interceptor-onCommit-error-rate | Rate | INFO | interceptor | Rate of exceptions in onCommit() |
interceptor-onCommit-error-total | CumulativeCount | INFO | interceptor | Total exceptions in onCommit() |
Tag: interceptor = simple class name. One set of metrics per interceptor class. If interceptor.classes lists three classes, three metric instances are created.
Cardinality is bounded by the number of configured interceptors (typically 1-3).
Similar to SerDe, use of client-id for distinguishing interceptors across different producers and consumers.
Metric Placement
- Serialization timing/size: around
keySerializer.serialize()andvalueSerializer.serialize()inKafkaProducer.doSend() - Deserialization timing/size: around
keyDeserializer.deserialize()andvalueDeserializer.deserialize()inCompletedFetch.fetchRecords() - Interceptor timing: inside
ProducerInterceptors/ConsumerInterceptorsaround each interceptor's method call (per interceptor class)
JMX
Standard MBean naming:
kafka.producer:type=producer-metrics,client-id={id},serializer={class},type={key|value}
kafka.producer:type=producer-metrics,client-id={id},interceptor={class}
kafka.consumer:type=consumer-metrics,client-id={id},deserializer={class},type={key|value}
kafka.consumer:type=consumer-metrics,client-id={id},interceptor={class}
...restNew metrics have distinct tag sets and do not collide with existing MBeans.
Configuration
No new configuration. Timing metrics are gated by the existing metrics.recording.level:
INFO(default): size, error, and gauge metrics active. NonanoTime()overhead.DEBUG: timing metrics also active.
Compatibility, Deprecation, and Migration Plan
Fully backwards compatible. New metrics added only.
Test Plan
- Unit tests for serde timing and size accuracy (producer and consumer)
- Unit tests for serde error counting on
SerializationException - Unit tests for null value (tombstone) handling -- no size recorded
- Unit tests for interceptor timing and error counting per method (producer and consumer)
- Unit tests for chained interceptors producing independent metric instances
- Integration tests for JMX MBean names and tag values including
client-id - Integration tests with custom
Serializer/Deserializerand chained interceptors
Rejected Alternatives
Serde metrics as a pluggable reporter/decorator. Pushes instrumentation burden to every team. The client controls the call site whereas measuring there is zero-cost for users by exposing these metrics.
Per-topic serde/interceptor metrics. Adding a topic tag would create unbounded cardinality for producers/consumers handling many topics. Per-client aggregation covers the common case (identifying that serde is slow); per-topic breakdown could be a follow-up if demand warrants it.