Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-17958
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka Connect publishes metrics for both Source and Sink connectors that capture the number of records read (source-record-poll-[rate|total], sink-record-read-[rate|total]), records produced (source-record-write-[rate|total], sink-record-send-[rate|total]), and records that are in "transit" - read from the source but not yet produced to the target (source-record-active-count(-[max|avg]), sink-record-active-count(-[max|avg])). It also publishes the total number of records skipped due to failures (total-records-skipped).
Those metrics are exposed by Kafka Connect via via the Java Management Extensions (JMX). In most cases, Kafka Connect operators use a combination of external monitoring agents (jmxtrans, telegraf, etc.) to collect these metrics on a certain cadence (e.g. every 30 seconds), and publish the metrics to some backend ( Prometheus, Influx, etc.) for long-term storage and analysis.
A message that is read from the source might not be produced to the target for multiple reasons. A good example is a connector that uses the Filter transform to filter out records. Some other cases where records could be skipped are:
- The output of a transformation is a null value
- Connector fails to write to the target system, and the connector is configured with error.tolerance=all
- These are covered by the total-records-skipped metric
The best approximation to calculate filtered records today is by doing some calculations with the existing metrics, e.g.:
source-skipped-records = source-record-poll-total - (source-record-write-total + source-record-active-count)
sink-skipped-records = sink-record-read-total - (sink-record-send-total + sink-record-active-count)
However, as metric sensors are not updated atomically, it is currently very hard (if not just impossible) to answer the question of "how many records have been skipped/filtered". Collectors can poll for metrics at any moment, so sensors can (and will) report information that is in an intermediate state.
A classic example is a MirrorMaker connector replicating topics that have relative large volumes of traffic. The poll-total and active-count sensors are incremented continuously, write-total is incremented (and active-count is decremented) when the batch closes, and all this happens at a very high pace. We have dozens of MM connectors with this usage pattern for which we cannot report accurate skipped record counts.
Proposed Changes
Update the SourceTaskMetricsGroup#recordWrite(...) and WorkerSinkTask#convertMessages(...) to publish the following metrics:
Metric Name | Metric Type | Description |
---|---|---|
source-record-skipped-rate | Rate | The average per-second number of records skipped (after transformations) by this task belonging to the named source connector in this worker, since the task was last restarted. |
source-record-skipped-total | CumulativeSum | The total number of records skipped (after transformations) by this task belonging to the named source connector in this worker, since the task was last restarted. |
sink-record-skipped-rate | Rate | The average per-second number of records skipped by this task belonging to the named sink connector in this worker, since the task was last restarted. |
sink-record-skipped-total | CumulativeSum | The total number of records skipped by this task belonging to the named sink connector in this worker, since the task was last restarted. |
Compatibility, Deprecation, and Migration Plan
Since these are new metrics, no backward compatibility concerns are anticipated.
Test Plan
Add additional unit and integration tests to ensure correct behavior.
Rejected Alternatives
We could continue relying on the existing metrics to try deriving the skipped records. While this works mostly OK for Sink connectors, there will be cases where the numbers reported are not accurate.