Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Currently, Sink and Source Connectors include latency metrics covering only the time expend interacting with the external systems —

  • put-batch-latency  metric measures the time to sink a batch of records, and
  • poll-batch-time metric measures the time to poll a batch of records from


  • the external system.

At the moment it's difficult to understand another latency aspects, e.g.:

  • how much latency has the connector


  • introduced in the process,


  • or
  • how long after being written a record is processed.

In order to observe the connector's performance and measure its complete end-to-end latency from sources to sinks there are , this KIP is proposing the following additional measurements:

  • In the source connector:
    • After polling, there are transformations and conversions that happen before the records are sent to Kafka.
  • In the sink connector:
    • Record latency: wall-clock time - record timestamp  to evaluate how late records are processed.
    • Convert and transform time before sending records to a external system


(*) New metrics

Source Connectors have the following stages:

  • task implementation polling records: gather batch of records from external system

  • transform: apply transformation chain individually
  • convert: convert records to ProducerRecord individually
  • send records: send records to Kafka topics individually

Polling The stage for polling records stage already has a latency metric: poll-batch-time .

transform-chain-source-record-time and convert-source-record-time metric will measure the transformations applied and conversion from SourceRecord  into ProducerRecord .

Send The stage for sending records stage can be monitored with Producer sender metrics, e.g. request-latency-avg/max

Sink Connectors have 3 the following stages:

  • consumer polling record: gather batch of consumer records

  • convert: convert records individually to generic SinkRecord ,

  • transform: apply transformation chain
  • process: put record batches into a external system.

Process The processing stage already has a latency metric: put-batch-time 

To measure sink-record-latency's record latency (i.e. processing time - event time), it's proposed to measure the different difference between record timestamp and current system time (wall-clock) just before the convert stage as it is when records are iterated already.


Polling can be monitored with Consumer fetch metrics, e.g. fetch-latency-avg/max 


Predicates as implemented via PredicatedTransformation will be also measured.


The per-record metrics will definitely be added to Kafka Connect as part of this KIP, but their metric level will be changed pending the performance testing described in

serverASF JIRA
, and will otherwise only be exposed at lower level (DEBUG instead of INFO, and TRACE instead of DEBUG)


ConnectConfig and TransformationChain users will have to migrate to the new interfaces. Though the updates these APIs are used internally on Worker instantiations of Tasks and not meant for external usage.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.