Status

Current state: Accepted

Discussion thread: here 

Vote thread: here

JIRA: KAFKA-19773 

Motivation

KIP-714 introduced the ClientTelemetryReceiver interface to enable MetricsReporter instances running on brokers to collect client telemetry metrics. Typically a reporter stores metrics it receives in a "metrics registry" and exposes them in a format suitable for the external monitoring system it is built for. This is exactly what JmxReporter does. Kafka clients and brokers explicitly delete their metrics when they are not used anymore. However for client telemetry metrics there isn't an explicit deletion mechanism.

Clients send their telemetry metrics at a regular interval, configured via telemetry subscriptions. Reporters are not aware of this push interval. Without the interval and an explicit deletion mechanism, reporters don't know when to clear client telemetry metrics and stop exposing them. This makes it hard to not expose stale metrics. Having uncertainty in metrics is a major problem as operators rely on them to manage their clusters and investigate issues.

This KIP proposes updating the context associated with each client telemetry push to include the push interval. This will enable reporters to properly manage the life cycle of client telemetry metrics.

Public Interfaces

  • A new interface ClientTelemetryContext to hold the request and telemetry (just the push interval) context:

    ClientTelemetryContext
    package org.apache.kafka.server.telemetry;
    
    import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
    
    /**
     * Context provided to {@link ClientTelemetryExporter} implementations when receiving client metrics.
     */
    public interface ClientTelemetryContext {
    
        /**
         * The interval defined via <code>metrics.interval</code> in the client metrics subscription
         * @return The interval in milliseconds
         */
        int pushIntervalMs();
    
        /**
         * The context associated with this request
         * @return The AuthorizableRequestContext associated with this request
         */
        AuthorizableRequestContext authorizableRequestContext();
    }
  • A new interface, ClientTelemetryExporter, that works like ClientTelemetryReceiver but uses ClientTelemetryContext.

    package org.apache.kafka.server.telemetry;
    
    /**
     * {@code ClientTelemetryExporter} defines the behaviour for telemetry exporter on the broker side
     * which receives client telemetry metrics.
     */
    public interface ClientTelemetryExporter {
    
        /**
         * Called by the broker when a client reports telemetry metrics. The associated telemetry context
         * can be used by the metrics plugin to retrieve additional client information such as client ids,
         * endpoints or the push interval.
         * <p>
         * This method may be called from the request handling thread, and as such should avoid blocking.
         *
         * @param context the client telemetry context for the corresponding {@code PushTelemetryRequest}
         *                api call.
         * @param payload the encoded telemetry payload as sent by the client.
         */
         void exportMetrics(ClientTelemetryContext context, ClientTelemetryPayload payload);
    } 
  • A new interface, ClientTelemetryExporterProvider, that works like ClientTelemetry but provides a ClientTelemetryExporter instance instead of ClientTelemetryReceiver.

    import org.apache.kafka.common.metrics.MetricsReporter;
    
    /**
     * A {@link MetricsReporter} may implement this interface to indicate support for collecting client
     * telemetry on the server side.
     */
    public interface ClientTelemetryExporterProvider {
    
        /**
         * Called by the broker to fetch instance of {@link ClientTelemetryExporter}.
         * <p>
         * This instance may be cached by the broker.
         *
         * @return broker side instance of {@link ClientTelemetryExporter}.
         */
        ClientTelemetryExporter clientTelemetryExporter();
    
    }

Proposed Changes

  • Deprecate the existing ClientTelemetryReceiver and ClientTelemetry interfaces and recommend users to instead use ClientTelemetryExporter and ClientTelemetryExporterProvider respectively.

  • Brokers will support both ClientTelemetry and ClientTelemetryExporterProvider implementations, and call the appropriate methods depending on the type of class provided.
  • If a class implements both ClientTelemetry and ClientTelemetryExporterProvider, only the methods from ClientTelemetryExporterProvider, which is the new class, will be used.

Compatibility, Deprecation, and Migration Plan

The existing interfaces, ClientTelemetryReceiver and ClientTelemetry, are kept and deprecated. They will be removed in the next major version, Kafka 5.0.0. All existing implementations will keep working until then.

To use the new features, implementations will need to implement the new interfaces, ClientTelemetryExporterProvider and ClientTelemetryExporter.

Test Plan

Adequate unit and integration tests will be added to ensure ClientTelemetry/ClientTelemetryReceiver implementations keep working and implementations of the new class work as well.

Rejected Alternatives

  • Add an override of ClientTelemetryReceiver.exportMetrics() that accepts ClientTelemetryContext: This would require implementations wanting to use ClientTelemetryContext to still implement the old ClientTelemetryReceiver.exportMetrics() method. If we want to remove of the old method in the future, implementations would need to be recompiled at that time too.
  • No labels