This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateDraft

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Client configuration represents the number one pain point hindering organizations’ adoption and/or expansion of Kafka. The large number of available configuration options bewilders teams new to the ecosystem, and even accomplished teams are not immune to the occasional misconfiguration. Additionally, the teams developing and deploying client applications in an organization are often separate and intentionally siloed from the teams that manage the organization's Kafka cluster (the most extreme case being management via a hosted service). This complexity unnecessarily delays projects and contributes to operational overhead, as configuration tuning often requires specialized expertise.

As Kafka’s usage increases within organizations, centralized observability of client configuration becomes increasingly critical. This KIP takes the first step in solving this issue by introducing a standard interface by which clients can push their configuration to the cluster, thus providing the cornerstone for observability and troubleshooting.

Similar to previous initiatives (e.g. KIP-714), this KIP makes the mechanism opt-in on the broker and opt-out on the client. It is strongly advised that this KIP is implemented and made the default in all Kafka client libraries, enabling its functionality without requiring direct interaction from cluster operators. The goal will boost developer productivity, accelerate project deployment time, and reduce operational overhead, thereby lowering the barrier to adoption and expansion for Kafka.

Public Interfaces

This KIP adds a new RPC to the Kafka protocol that forms a handshake between the client and broker, named PushConfig, by which the client sends the configuration (keys and values) to the broker. Brokers interact with a new ClientConfigPolicy interface to process the RPC from the client.

Concepts

This section introduces key concepts used throughout this KIP.

Client Instance ID

ClientInstanceId was introduced in KIP-714 and is a UUID version 4-based value that provides a unique client ID. This ID is not a secret or token, but a value with which brokers can correlate different clients. If a client and broker supports the features from both KIP-714 and this KIP, it must use the same ClientInstanceId for both configuration and telemetry. Clients generate a new client instance ID on startup before any network activity. The ClientInstanceId is tied to the client, not its connections—a client uses the same ID for all broker connections. The ID remains valid for the client process lifetime and is stored in memory. A new ClientInstanceId is generated each time the client restarts. Brokers receive the same ClientInstanceId in all ApiVersions requests from a given client but do not coordinate, validate, or track its origin. Brokers implicitly trust the ID; it is not a secret. The value is stored in the RequestContext alongside ClientSoftwareName and ClientSoftwareVersion.

Sensitive Configuration

Some configuration is sensitive in that they may leak credentials, PII, or other information that could violate an organization’s data policies. For this reason, clients define a default, fixed set of configuration that will be sent to brokers. The set of configuration is carefully vetted to ensure no sensitive data is leaked. Most of the non-sensitive configuration is numeric (e.g. linger.ms), boolean (e.g. enable.auto.commit), or one of a fixed set of enums (e.g. share.acquire.mode).

In the case that the defined set of non-sensitive configuration is still too sensitive, users can override the default list with the new configuration config.push.allowed.keys which is a comma separated list of configuration to send instead. config.push.allowed.keys is itself not sent to the server unless it is explicitly included in the override value.

Default Configuration for Apache Kafka Java Producer

By default, the Producer sends values for the following configuration keys:

  • acks

  • batch.size

  • buffer.memory

  • client.id

  • compression.type

  • delivery.timeout.ms

  • enable.idempotence

  • linger.ms

  • max.in.flight.requests.per.connection

  • request.timeout.ms

  • retries

Most of these values are numeric or one of a fixed set of enums, which greatly limits potential security exposure.

Default Configuration for Apache Kafka Java Consumer

By default, the Consumer sends values for the following configuration keys:

  • auto.offset.reset

  • client.id

  • enable.auto.commit

  • fetch.min.bytes

  • fetch.max.wait.ms

  • group.id

  • isolation.level

  • max.poll.interval.ms

  • max.poll.records

  • session.timeout.ms

Most of these values are numeric or one of a fixed set of enums, which greatly limits potential security exposure.

Default Configuration for Apache Kafka Java Share Consumer

By default, the Share Consumer sends values for the following configuration keys:

  • client.id
  • fetch.max.wait.ms
  • fetch.min.bytes
  • group.id
  • max.poll.interval.ms
  • max.poll.records
  • share.acknowledgement.mode
  • share.acquire.mode

All configuration are then subject to another test to ensure they are not considered sensitive. We define configuration as sensitive that meets any of the following criteria:

  • Security-related

  • Class or implementation names (in the Java reference implementation, where ConfigDef.Type equals CLASS)

  • Passwords (in the Java reference implementation, where ConfigDef.Type equals PASSWORD)

  • Custom configuration (e.g. used by application, SerDes, etc.) since Kafka cannot determine if these could contain sensitive information
  • All configuration keys starting with sasl., security., or ssl.

  • All configuration keys containing .sasl., .security., or .ssl.

  • All configuration keys ending in .class or .classes

In the case that a given configuration (either default or from config.push.allowed.keys) meets the above sensitive criteria, the client logs a warning message and the configuration value for that key is not sent to the broker. In the case that all configuration is deemed sensitive, the client does not send anything configuration-related to the broker (i.e. it doesn’t send a request with an empty set of configuration).

Client libraries that use a different naming convention for its configuration should adjust their exclusion logic appropriately.

Client Configuration Policy

This KIP introduces a new broker-side (Java) interface named ClientConfigPolicy. This interface processes (stores, etc.) the client configuration pushed to the broker. Apache Kafka operators can install and configure a ClientConfigPolicy implementation to suit their specific needs using the configuration described later.

End-to-end Flow

The following sequence diagram depicts the flow between the various entities:

The flow includes the following steps:

  1. The client establishes a connection to the broker

  2. The client sends an ApiVersions request

  3. If client.config.policy.class.name is configured, the broker advertises support for the PushConfig RPC in the ApiVersions response

  4. The client collects the configuration values, constructs a PushConfig request, and sends it to the broker.

  5. The broker validates the PushConfig request and invokes ClientConfigPolicy.process() to handle the configuration.

    1. In this example, the implementation writes the configuration snapshot to external storage for observability.

  6. ClientConfigPolicy.process() completes successfully.

  7. The broker returns a successful PushConfig response. The client completes initialization and is ready for user API calls (e.g., send(), poll()).

ClientPushConfigData

When the broker receives a PushConfig request, it creates a ClientConfigData instance that includes the configuration values (ClientConfig) from the client. The broker also generates a UTC timestamp representing when the request was received and includes that in the ClientPushConfigData.

package org.apache.kafka.server.policy.clientconfig;
/**
 * Record containing an individual Config.
 */
public record ClientConfig(String key, Object value, ConfigDef.Type type, boolean isDefault) {}


package org.apache.kafka.server.policy.clientconfig;
/**
 * Record containing the PushConfig API data.
 *
 * <p/>
 *
 * The client configuration values come directly from the RPC. The broker
 * will supply its current timestamp for the value of the same name.
 */
public record ClientConfigData(List<ClientConfig> configs, long timestamp) {}

ClientConfigPolicy

The broker exposes a plugin interface named ClientConfigPolicy that provides the API for processing the configuration sent by the client. The interface is used on the broker to interact with the PushConfig RPC.

package org.apache.kafka.server.policy.clientconfig;

/**
 * An interface for intercepting and enforcing client configuration.
 *
 * <p/>
 *
 * If <code>client.config.policy.class.name</code> is defined, Kafka will
 * create an instance of the specified class using the default constructor and
 * will then pass the broker configs to its <code>configure()</code> method.
 * During broker shutdown, the <code>close()</code> method will be invoked
 * so that resources can be released (if necessary).
 */
@InterfaceStability.Evolving
public interface ClientConfigPolicy extends Reconfigurable, AutoCloseable {
  
  /**
   * Receive the {@link ClientPushConfigData} data for observability.
   * <p/>
   * <em>Note 1</em>: the implementation of this method must not block.
   * <p/>
   * <em>Note 2</em>: this method will <em>not</em> be invoked if the {@code Config} array
   * of the {@link ClientPushConfigData} was larger than {@code client.config.max.bytes}.
   */
  void process(AuthorizableRequestContext context, ClientConfigData pushConfigData);
}

ClientConfigTooLargeException

The broker checks the size of the configuration data sent via the PushConfig RPC. If it is larger than 

package org.apache.kafka.common.errors;

/**
 * This exception indicates that the size of the client configuration data exceeded the
 * broker's client.config.max.bytes configuration.
 */
public class ClientConfigTooLargeException extends ApiException {

    public ClientConfigTooLargeException(String message) {
        super(message);
    }
}

Configuration

Broker

Setting client.config.policy.class.name to null disables the feature on the broker.

Configuration name

Description

Values

client.config.policy.class.name

The client configuration policy class. The class must implement the org.apache.kafka.server.policy.ClientConfigPolicy interface.

Type: class

Default: null

client.config.max.bytes

Maximum size for the configuration, in bytes

Type: int

Default: 10240 (10 KB)

Client

Applies to all of KafkaProducer, KafkaConsumer and KafkaAdmin clients, as well as Kafka Streams.

Configuration name

Description

Values

enable.config.push

This configuration controls whether the client performs the configuration handshake during the establishment of a new client connection.

Type: boolean

Default: true

config.push.allowed.keys

Overrides the default set of configuration keys with the list from this configuration.

Type: list

Default: null

Protocol

ApiVersionsRequest

{
  "apiKey": 18,
  "type": "request",
  "listeners": ["broker", "controller"],
  "name": "ApiVersionsRequest",
  // Versions 0 through 2 of ApiVersionsRequest are the same.
  //
  // Version 3 is the first flexible version and adds ClientSoftwareName and ClientSoftwareVersion.
  //
  // Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion in the response from being 0.
  //
  // Version 5 adds ClientInstanceId
  "validVersions": "0-5",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ClientSoftwareName", "type": "string", "versions": "3+",
      "ignorable": true, "about": "The name of the client." },
    { "name": "ClientSoftwareVersion", "type": "string", "versions": "3+",
      "ignorable": true, "about": "The version of the client." },
    { "name": "ClientInstanceId", "type": "uuid", "versions": "5+",
      "ignorable": true, "about": "Unique ID for this client instance."}
  ]
}

This KIP adds a ClientInstanceId field to the existing ApiVersions RPC, which already includes ClientSoftwareName and ClientSoftwareVersion.

Errors

The following error is new for this RPC:

Exception

Error Code

Description

Client Action

ConfigTooLargeException

CONFIG_TOO_LARGE

Client sent a request in which the PushConfig request was too large (see client.config.max.bytes)

Log the error in ErrorMessage then continue


PushConfigRequest

{
  "apiKey": NEXT,
  "type": "request",
  "listeners": ["broker"],
  "name": "PushConfigRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Configs", "type": "[]Config", "versions": "0+",
      "about": "The client configuration entries.", "fields": [
      { "name": "ConfigKey", "type": "string", "versions": "0+",
        "about": "The configuration key."},
      { "name": "ConfigValue", "type": "string", "versions": "0+",
        "about": "The configuration value."},
      { "name": "ConfigType", "type": "int8", "versions": "0+",
        "about": "ConfigDef.Type of the ConfigValue field."},
      { "name": "IsDefault", "type": "bool", "versions": "0+",
        "about": "Boolean where true means the configuration value wasn't changed by the user."},
    ]}
  ]
}

After receiving ApiVersionsResponse, the client collects the configuration values and sends them in a PushConfigRequest. The client typically sends this request once during bootstrap, before invoking client APIs. Retries use retry.backoff.ms, retry.backoff.max.ms, and default.api.timeout.ms, similar to ApiVersions.

The ConfigType field is an integer that maps to the ConfigDef.Type enum.

PushConfigResponse

{
  "apiKey": NEXT,
  "type": "response",
  "name": "PushConfigResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    {
      "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."},
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error."},
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." }
  ]
}

PushConfigResponse has a field named ErrorMessage that contains a brief reason for the error.

Error Handling

Clients retry on network and retriable errors. If throttled (ThrottleTimeMs > 0), the client waits before retrying. Fatal errors (e.g., UNSUPPORTED_VERSION, authentication failures) are not retried and should throw runtime exceptions.

Configuration support is performed on a best-effort basis. Failure to send the configuration should not prevent the client from functioning.

Proposed Changes

Disabling the Feature

This feature can be disabled on the broker and the client. For the broker, remove client.config.policy.class.name or set its value to null. When this configuration is missing, the ApiVersions response will not include support for the feature, so the client doesn’t send configuration. For the client, set enable.config.push to false, in which case the client skips the entire configuration handshake.

Broker Behavior

If a broker is configured with client.config.policy.class.name, the ApiVersions response advertises support for the PushConfig RPC. Whenever a client sends a PushConfig request, the broker calls the policy with the client configuration for observability.

Configuration Payload Size Enforcement

As described below, client implementations should not attempt to send a payload that is too large in the first place. But as a backup means of preventing the client from sending too much data, the broker checks the new configuration client.config.max.bytes prior to invoking the policy. If the size of the PushConfig request exceeds client.config.max.bytes, the broker returns the CONFIG_TOO_LARGE error to the client.

After analyzing the different Java clients’ configuration, a default of 10 KB for client.config.max.bytes provides more than sufficient capacity:

Client

Number of Non-sensitive Configuration Keys

Total Request Size

AdminClientConfig

~25

< 1 KB

ConsumerConfig

~50 configs

< 2 KB

ProducerConfig

~45 configs

< 2 KB

StreamsConfig

~55 configs (without producer, consumer, admin)

~2 KB

A default 10 KB limit provides ~5x headroom for the largest expected use case, while preventing abuse from malicious or errant clients.

Excluding Sensitive Configuration

As explained in the concepts section, the ClientConfigPolicy implementation may also provide logic to ensure that clients do not send sensitive configuration. Clients across the Apache Kafka ecosystem do not have a consistent naming convention. As a result, brokers cannot determine sensitivity based on the configuration key name and rely on the incoming ConfigType field. When the ClientConfigPolicy detects sensitive configuration, it includes a description of the violation in the RPC response.

Metrics

The following new broker metrics are added:

Name

Type

Group

Tags

Notes

instance-count

Gauge

client-config


The current number of unique client instance IDs.

NOTE: this is from KIP-714 but will likely be refactored KIP-1313.

plugin-config-count

plugin-config-rate

Meter

client-config

client_instance_id

The total number/rate of PushConfig requests being pushed to the ClientConfigPolicy plugin, regardless of success/failure.

plugin-error-count

plugin-error-rate

Meter

client-config

client_instance_id

The total number/rate of errors raised during preparation for and/or invoking the plugin’s process() method.

plugin-process-time-avg

plugin-process-time-max

Avg and Maxclient-configclient_instance_idThe length of time (in milliseconds) the broker spent invoking the plugin’s process() method.

Client Behavior

Handshake

A client that supports this configuration interface will identify a node that supports the API using ApiVersions. The client performs a handshake by collecting the values for its configuration issuing a PushConfig RPC to submit the configuration to the broker node. The client sends the RPC after authentication (if any) and before the client starts to use the connection for requests. Similar to the ApiVersions handshake, the PushConfig RPC specifies a fixed timeout of default.timeout.ms. If the RPC exhausts its retries, the client logs the error, but continues execution.

Here’s the sequence:

  1. Client connects to broker

  2. Send ApiVersions request (internal, automatic)

  3. Receive ApiVersions response to determine which features broker supports

  4. If enable.config.push is set and configuration push is supported by the broker

    1. Collect requested client configuration values

    2. Send PushConfig with configuration

    3. Receive PushConfig response

  5. User requests can now be sent

The client chooses a randomly selected node for its configuration handshake, in the same way as GetTelemetrySubscriptions. The client configuration is only sent once, not for each broker. The “handshake” is performed on a per-client basis, not for each connection. As connections are closed due to disconnects or aging out, no new configuration handshake is performed. This operation is executed once for each distinct client instance.

The handshake is performed on a best-effort basis. Network or other transient errors that occur when transmitting the configuration data must not prevent the client from functioning.

Blocking Behavior

From the user’s perspective, the client blocks when a client API is invoked until the handshake completes.

For example:

// KafkaProducer constructor returns immediately
KafkaProducer<String, String> producer = new KafkaProducer<>(props);   // ✓ Non-blocking

// First send() blocks until connection is ready (including configuration handshake)
producer.send(record);                                                 // ← Blocks up to max.block.ms waiting for READY state

Excluding Sensitive Configuration

The configuration entries sent in the PushConfig request should exclude any sensitive information. Which configuration keys are considered sensitive is determined by the client library.

Including Configuration Data Types

Depending on the server-side implementation, preserving the data types enables more compact storage, reduces redundant conversion, reduce cognitive overhead in downstream use, and eliminate invalid data (e.g. storing a value of “hi!” in a boolean). It’s also important because the server is not aware of all the different clients and their respective configuration, so providing the name, type, and value, though at times redundant, allows for wider client compatibility.

Metrics

This KIP does not introduce any new client metrics.

Compatibility, Deprecation, and Migration Plan

Impact on Existing User

Broker

  • No policy configured: Feature is effectively disabled

    • No new RPCs are advertised in ApiVersions

    • If client sends requests, broker throws an error

    • No performance or behavioral impact

  • Policy configured: Feature is enabled

    • New RPCs are advertised in ApiVersions

    • Broker receives and processes config pushes from supporting clients

    • Older clients (no support) are unaffected

    • Minimal resource overhead

  • Upgrade path: Rolling upgrade safe

    • Old brokers: don't advertise config push APIs, clients skip handshake

    • New brokers: advertise APIs, clients perform handshake if enabled

    • No cross-version issues

Client

  • Older clients (no support): No impact

    • Don't check for config push APIs

    • Behavior identical to pre-KIP

  • Newer clients with feature disabled: No impact

    • enable.config.push=false

    • Handshake skipped

    • Behaviorally identical to older clients

  • Newer clients with feature enabled (default): Minor impact

    • Additional RTT during connection setup (PushConfig)

    • Estimated 10-50ms added latency to first user request (depends on network RTT)

    • One-time cost per client instance lifetime

Migration

This is a purely additive feature and requires no migration.

  • Before KIP: No client configuration visibility

  • After KIP: Incremental visibility as clients upgrade

  • Breaking changes: n/a

  • Deprecation: n/a

Test Plan

Integration Tests

Happy Path

  1. Test complete handshake with success

Client Startup

  1. Test producer, consumer, admin, and Kafka Streams client with enable.config.push set to true performs handshake

  2. Test producer, consumer, admin, and Kafka Streams client with enable.config.push set to false skip handshake entirely

  3. Test Kafka Streams does not perform separate handshake for embedded producer/consumer/admin clients

Broker Configuration

  1. Test broker with client.config.policy.class.name set advertises APIs in ApiVersions

  2. Test broker without policy (null) does not advertise config push APIs

  3. Test broker with policy invokes process() on successful PushConfig

Mixed Broker Versions (Rolling Upgrade)

  1. Test old brokers (no config push support) don't advertise APIs

  2. Test new brokers advertise APIs

  3. Test clients detect support via ApiVersions and only handshake with new brokers

  4. Test clients work correctly when connecting to mix of old and new brokers

Retries

  1. Test client retries PushConfig on retriable errors

  2. Test client does not retry on CONFIG_TOO_LARGE or INVALID_CONFIG

  3. Test exponential backoff is applied correctly

Timeout Handling

  1. Test handshake respects default.api.timeout.ms

  2. Test client continues if handshake times out (best-effort feature)

  3. Test timeout does not block subsequent operations

Throttling

  1. Test client waits for ThrottleTimeMs before retrying if throttled

System Tests

Multiple Client Types

  1. Test Java KafkaProducer, KafkaConsumer, AdminClient, and KafkaStreams application and verify each client type sends appropriate configs based on the type of client

Large Payloads

  1. Test config payload near client.config.max.bytes limit

  2. Test config payload exceeding client.config.max.bytes returns CONFIG_TOO_LARGE

Policy

  1. Test custom ClientConfigPolicy rejects configs via InvalidConfigException

  2. Test client receives INVALID_CONFIG error with an appropriate message

Rejected Alternatives

Exclusion of Default Values

The number of configuration entries is getting larger with each release, the vast majority of which use default settings. The question arises: how should we handle configuration entries that use a default value? Here are some options for the client:

  1. Send all configuration entries to the cluster, including those with default values.

  2. Omit any configuration entries that use their respective default values.

  3. Send all configuration entries, but omit the value value and demarcate those that use default values.

It’s redundant to send configuration with known default values. Preparing and sending the name, type, and value for scores of configuration could add up to a couple of KB in network transit. Additionally, that then means that the server side node that receives the handshake request then has to handle requests that consist mostly of entries with default values. It’s easy to argue that the defaults are superfluous and not include them.

Whether or not to include configuration entries with default values somewhat depends on what the ClientConfigPolicy implementation plans to do with those entries. Also, keep in mind that the server receiving the handshakes may service many different clients from different languages and versions. Even though the client knows when the configuration entry’s value is the default value, the broker handling the handshake may have no idea that, for example, topic.compression.level=low from the 2.5.2 version of the Visual Basic Kafka client is the default value for that client. Requiring each implementor of ClientConfigPolicy to maintain a listing of all the default values across all available Kafka clients and their respective versions seems like overkill.

The stance of this KIP is that there is no need to exclude default configuration, based on the following:

  1. The configuration handshake only occurs once during the lifetime of a client.

  2. The configuration handshake request size is negligible compared with the amount of network traffic over the course of the lifetime of a client.

  3. The server node handling the incoming configuration handshake can drop any entries that come in with a default value.

Requiring that broker know a priori the default values (to fill in the missing information) is a maintenance problem.

Including Configuration Storage Detail

Storage and retention of the configuration data is outside the scope of this KIP. The ClientConfigPolicy implementation is responsible for managing the storage, if any, of the configuration payload once the broker invokes it.

  • No labels