Status

Current state: Under Discussion

Discussion thread: here 

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

With the delivery of KIP-714, cluster operators can now pull metrics from a broker (sent by clients) via a plugin. KIP-714 dramatically simplifies observing client behavior via metrics.  Still, it only accounts for metrics from clients (admin, consumer, and producer), leaving out applications that embed client instances, such as Kafka Streams. While the importance of capturing client metrics is evident, there should be a mechanism for capturing the enclosing application metrics, as reviewing both sets will be required to get a complete performance picture. This proposal aims to provide a mechanism for applications containing Kafka clients to supplement the client metrics with some application-specific metrics.

Public Interfaces

In this KIP, we propose to add a new method to the Admin, Consumer, and Producer  interfaces. The KafkaAdminClient, KafkaConsumer, and KafkaProducer  will implement this new method.

Proposed Changes

The method registerMetricsForSubscription  will be added to the Admin, Consumer,  and Producer  interfaces.


Admin interface change
package org.apache.kafka.clients.admin;

public interface Admin extends AutoCloseable {  

/**
 *  Application metrics provided for subscription.
 *  These metrics will be added to this client's metrics
 *  that are available for subscription and sent as telemetry data to the broker.
 *
 * @param metrics the application metrics to register
 */ 
void registerMetricsForSubscription(Collection<? extends Metric> metrics)


Consumer interface change
package org.apache.kafka.clients.consumer;

public interface Consumer<K, V> extends Closeable {
   
/**
 *  Application metrics provided for subscription.
 *  These metrics will be added to this client's metrics
 *  that are available for subscription and sent as telemetry data to the broker.
 *
 * @param metrics the application metrics to register
 */   
void registerMetricsForSubscription(Collection<? extends Metric> metrics)


Producer interface changes
package org.apache.kafka.clients.producer;

public interface Producer<K, V> extends Closeable {   

/**
 *  Application metrics provided for subscription.
 *  These metrics will be added to this client's metrics
 *  that are available for subscription and sent as telemetry data to the broker.
 *
 * @param metrics the application metrics to register
 */  
void registerMetricsForSubscription(Collection<? extends Metric> metrics)

Metrics naming

Since this KIP uses the entire telemetry metrics pipeline of KIP-714, the metrics naming and formatting will leverage the same process and rules found in KIP-714. 

The following table illustrates the derivation of the telemetry metric names from Kafka Streams metric names:

Kafka Streams metric nameTelemetry metric name
"process-records-avg, group=stream-thread-metrics""org.apache.kafka.stream.thread.process.records.avg"
"commit-latency-max, group=stream-thread-metrics""org.apache.kafka.stream.thread.commit.latency.max"

Metric format

Since this proposed KIP is an extension of KIP-714, it will follow the same approach for supported metric types.  When providing metrics via registerMetricsForSubscription, only metrics of type sum (a monotonic counter, like process-total) and gauge (a non-monotonic value, like process-rate) are registered, other types are dropped generating a log WARN statement.   

Configuration

With KIP-714, clients use the ENABLE_METRICS_PUSH_CONFIG configuration to enable pushing metrics.  Kafka Streams also has the ENABLE_METRICS_PUSH_CONFIG  configuration, which is meant to extend to the internal client's ability to push metrics.

But Kafka Streams is in a unique position, as it is a client application itself.  As a consequence, there is some potential tension with different metrics enablement patterns between Kafka Streams and its internal clients. Below is a table of configuration possibilities:

Metrics enabledKafka Streams config actionClient config action
Kafka Streams = yes, clients = yesNothing, enabled by defaultNothing, enabled by default
Kafka Streams = no, clients = yes

ENABLE_METRICS_PUSH_CONFIG = false 

main.consumer.ENABLE_METRICS_PUSH_CONFIG = true 

producer.ENABLE_METRICS_PUSH_CONFIG = true 

It's important to note that since Kafka Streams leverages the client metrics mechanism, disabling metrics for internal clients removes the possibility of pushing Kafka Streams metrics.  

To alert the user to this significant configuration mismatch, a Kafka Streams application configured with ENABLE_METRICS_PUSH_CONFIG=true and a main or admin consumer configured with ENABLE_METRICS_PUSH_CONFIG=false, will result in ConfigException  on start-up.

Compatibility, Deprecation, and Migration Plan

Since this is a new interface and opt-in behavior, no backward compatibility concerns are anticipated. 

Note that for the minimum broker version for using telemetry reporting is 3.7.  Using v. 3.7+ clients and Kafka Streams with pre-3.7 broker means there is no telemetry collection and the registerMetricsForSubscription  as a no-op method.  There is no action to be taken by the user in this case.

Test Plan

There will be unit and integration tests added to ensure correct behavior.

Rejected Alternatives

We could have used a new NetworkClient object inside Kafka Streams to connect to a broker and push Kafka Streams metrics. However, this approach is hacky and not general purpose, as it would only solve the issue for Kafka Streams and not third-party applications. Also, additional connections to a broker incur a cost, so we abandoned this idea.
 

  • No labels