DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Draft
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Client configuration represents the number one pain point hindering organizations’ adoption and/or expansion of Kafka. The large number of available configuration options bewilders teams new to the ecosystem, and even accomplished teams are not immune to the occasional misconfiguration. Additionally, the teams developing and deploying client applications in an organization are often separate and intentionally siloed from the teams that manage the organization's Kafka cluster (the most extreme case being management via a hosted service). This complexity unnecessarily delays projects and contributes to operational overhead, as configuration tuning often requires specialized expertise.
As Kafka’s usage increases within organizations, centralized observability of client configuration becomes increasingly critical. This KIP takes the first step in solving this issue by introducing a standard interface by which clients can push their configuration to the cluster, thus providing the cornerstone for observability and troubleshooting.
Similar to previous initiatives (e.g. KIP-714), this KIP makes the mechanism opt-in on the broker and opt-out on the client. It is strongly advised that this KIP is implemented and made the default in all Kafka client libraries, enabling its functionality without requiring direct interaction from cluster operators. The goal will boost developer productivity, accelerate project deployment time, and reduce operational overhead, thereby lowering the barrier to adoption and expansion for Kafka.
Public Interfaces
This KIP adds a new RPC to the Kafka protocol that forms a handshake between the client and broker, named PushConfig, by which the client sends the configuration (keys and values) to the broker. Brokers interact with a new ClientConfigPolicy interface to process the RPC from the client.
Concepts
This section introduces key concepts used throughout this KIP.
Client Instance ID
ClientInstanceId was introduced in KIP-714 and is a UUID version 4-based value that provides a unique client ID. This ID is not a secret or token, but a value with which brokers can correlate different clients. If a client and broker supports the features from both KIP-714 and this KIP, it must use the same ClientInstanceId for both configuration and telemetry. Clients generate a new client instance ID on startup before any network activity. The ClientInstanceId is tied to the client, not its connections—a client uses the same ID for all broker connections. The ID remains valid for the client process lifetime and is stored in memory. A new ClientInstanceId is generated each time the client restarts. Brokers receive the same ClientInstanceId in all ApiVersions requests from a given client but do not coordinate, validate, or track its origin. Brokers implicitly trust the ID; it is not a secret. The value is stored in the RequestContext alongside ClientSoftwareName and ClientSoftwareVersion.
Sensitive Configuration
Some configuration is sensitive in that they may leak credentials, PII, or other information that could violate an organization’s data policies. For this reason, clients define a default, fixed set of configuration that will be sent to brokers. The set of configuration is carefully vetted to ensure no sensitive data is leaked. Most of the non-sensitive configuration is numeric (e.g. linger.ms), boolean (e.g. enable.auto.commit), or one of a fixed set of enums (e.g. share.acquire.mode).
In the case that the defined set of non-sensitive configuration is still too sensitive, users can override the default list with the new configuration config.push.allowed.keys which is a comma separated list of configuration to send instead. config.push.allowed.keys is itself not sent to the server unless it is explicitly included in the override value.
Default Configuration for Apache Kafka Java Producer
By default, the Producer sends values for the following configuration keys:
acksbatch.sizebuffer.memoryclient.idcompression.typedelivery.timeout.msenable.idempotencelinger.msmax.in.flight.requests.per.connectionrequest.timeout.msretries
Most of these values are numeric or one of a fixed set of enums, which greatly limits potential security exposure.
Default Configuration for Apache Kafka Java Consumer
By default, the Consumer sends values for the following configuration keys:
auto.offset.resetclient.idenable.auto.commitfetch.min.bytesfetch.max.wait.msgroup.idisolation.levelmax.poll.interval.msmax.poll.recordssession.timeout.ms
Most of these values are numeric or one of a fixed set of enums, which greatly limits potential security exposure.
Default Configuration for Apache Kafka Java Share Consumer
By default, the Share Consumer sends values for the following configuration keys:
client.idfetch.max.wait.msfetch.min.bytesgroup.idmax.poll.interval.msmax.poll.recordsshare.acknowledgement.modeshare.acquire.mode
All configuration are then subject to another test to ensure they are not considered sensitive. We define configuration as sensitive that meets any of the following criteria:
Security-related
Class or implementation names (in the Java reference implementation, where
ConfigDef.TypeequalsCLASS)Passwords (in the Java reference implementation, where
ConfigDef.TypeequalsPASSWORD)- Custom configuration (e.g. used by application, SerDes, etc.) since Kafka cannot determine if these could contain sensitive information
All configuration keys starting with
sasl.,security., orssl.All configuration keys containing
.sasl.,.security., or.ssl.All configuration keys ending in
.classor.classes
In the case that a given configuration (either default or from config.push.allowed.keys) meets the above sensitive criteria, the client logs a warning message and the configuration value for that key is not sent to the broker. In the case that all configuration is deemed sensitive, the client does not send anything configuration-related to the broker (i.e. it doesn’t send a request with an empty set of configuration).
Client libraries that use a different naming convention for its configuration should adjust their exclusion logic appropriately.
Client Configuration Policy
This KIP introduces a new broker-side (Java) interface named ClientConfigPolicy. This interface processes (stores, etc.) the client configuration pushed to the broker. Apache Kafka operators can install and configure a ClientConfigPolicy implementation to suit their specific needs using the configuration described later.
End-to-end Flow
The following sequence diagram depicts the flow between the various entities:
The flow includes the following steps:
The client establishes a connection to the broker
The client sends an
ApiVersionsrequestIf
client.config.policy.class.nameis configured, the broker advertises support for thePushConfigRPC in theApiVersionsresponseThe client collects the configuration values, constructs a
PushConfigrequest, and sends it to the broker.The broker validates the
PushConfigrequest and invokesClientConfigPolicy.process()to handle the configuration.In this example, the implementation writes the configuration snapshot to external storage for observability.
ClientConfigPolicy.process()completes successfully.The broker returns a successful
PushConfigresponse. The client completes initialization and is ready for user API calls (e.g.,send(),poll()).
ClientPushConfigData
When the broker receives a PushConfig request, it creates a ClientConfigData instance that includes the configuration values (ClientConfig) from the client. The broker also generates a UTC timestamp representing when the request was received and includes that in the ClientPushConfigData.
package org.apache.kafka.server.policy.clientconfig;
/**
* Record containing an individual Config.
*/
public record ClientConfig(String key, Object value, ConfigDef.Type type, boolean isDefault) {}
package org.apache.kafka.server.policy.clientconfig;
/**
* Record containing the PushConfig API data.
*
* <p/>
*
* The client configuration values come directly from the RPC. The broker
* will supply its current timestamp for the value of the same name.
*/
public record ClientConfigData(List<ClientConfig> configs, long timestamp) {}
ClientConfigPolicy
The broker exposes a plugin interface named ClientConfigPolicy that provides the API for processing the configuration sent by the client. The interface is used on the broker to interact with the PushConfig RPC.
package org.apache.kafka.server.policy.clientconfig;
/**
* An interface for intercepting and enforcing client configuration.
*
* <p/>
*
* If <code>client.config.policy.class.name</code> is defined, Kafka will
* create an instance of the specified class using the default constructor and
* will then pass the broker configs to its <code>configure()</code> method.
* During broker shutdown, the <code>close()</code> method will be invoked
* so that resources can be released (if necessary).
*/
@InterfaceStability.Evolving
public interface ClientConfigPolicy extends Reconfigurable, AutoCloseable {
/**
* Receive the {@link ClientPushConfigData} data for observability.
* <p/>
* <em>Note 1</em>: the implementation of this method must not block.
* <p/>
* <em>Note 2</em>: this method will <em>not</em> be invoked if the {@code Config} array
* of the {@link ClientPushConfigData} was larger than {@code client.config.max.bytes}.
*/
void process(AuthorizableRequestContext context, ClientConfigData pushConfigData);
}
ClientConfigTooLargeException
The broker checks the size of the configuration data sent via the PushConfig RPC. If it is larger than
package org.apache.kafka.common.errors;
/**
* This exception indicates that the size of the client configuration data exceeded the
* broker's client.config.max.bytes configuration.
*/
public class ClientConfigTooLargeException extends ApiException {
public ClientConfigTooLargeException(String message) {
super(message);
}
}
Configuration
Broker
Setting client.config.policy.class.name to null disables the feature on the broker.
Configuration name | Description | Values |
|---|---|---|
| The client configuration policy class. The class must implement the | Type: Default: |
| Maximum size for the configuration, in bytes | Type: Default: |
Client
Applies to all of KafkaProducer, KafkaConsumer and KafkaAdmin clients, as well as Kafka Streams.
Configuration name | Description | Values |
|---|---|---|
| This configuration controls whether the client performs the configuration handshake during the establishment of a new client connection. | Type: Default: |
| Overrides the default set of configuration keys with the list from this configuration. | Type: Default: |
Protocol
ApiVersionsRequest
{
"apiKey": 18,
"type": "request",
"listeners": ["broker", "controller"],
"name": "ApiVersionsRequest",
// Versions 0 through 2 of ApiVersionsRequest are the same.
//
// Version 3 is the first flexible version and adds ClientSoftwareName and ClientSoftwareVersion.
//
// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion in the response from being 0.
//
// Version 5 adds ClientInstanceId
"validVersions": "0-5",
"flexibleVersions": "3+",
"fields": [
{ "name": "ClientSoftwareName", "type": "string", "versions": "3+",
"ignorable": true, "about": "The name of the client." },
{ "name": "ClientSoftwareVersion", "type": "string", "versions": "3+",
"ignorable": true, "about": "The version of the client." },
{ "name": "ClientInstanceId", "type": "uuid", "versions": "5+",
"ignorable": true, "about": "Unique ID for this client instance."}
]
}
This KIP adds a ClientInstanceId field to the existing ApiVersions RPC, which already includes ClientSoftwareName and ClientSoftwareVersion.
Errors
The following error is new for this RPC:
Exception | Error Code | Description | Client Action |
|---|---|---|---|
|
| Client sent a request in which the | Log the error in |
PushConfigRequest
{
"apiKey": NEXT,
"type": "request",
"listeners": ["broker"],
"name": "PushConfigRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "Configs", "type": "[]Config", "versions": "0+",
"about": "The client configuration entries.", "fields": [
{ "name": "ConfigKey", "type": "string", "versions": "0+",
"about": "The configuration key."},
{ "name": "ConfigValue", "type": "string", "versions": "0+",
"about": "The configuration value."},
{ "name": "ConfigType", "type": "int8", "versions": "0+",
"about": "ConfigDef.Type of the ConfigValue field."},
{ "name": "IsDefault", "type": "bool", "versions": "0+",
"about": "Boolean where true means the configuration value wasn't changed by the user."},
]}
]
}
After receiving ApiVersionsResponse, the client collects the configuration values and sends them in a PushConfigRequest. The client typically sends this request once during bootstrap, before invoking client APIs. Retries use retry.backoff.ms, retry.backoff.max.ms, and default.api.timeout.ms, similar to ApiVersions.
The ConfigType field is an integer that maps to the ConfigDef.Type enum.
PushConfigResponse
{
"apiKey": NEXT,
"type": "response",
"name": "PushConfigResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{
"name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota."},
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error."},
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The top-level error message, or null if there was no error." }
]
}
PushConfigResponse has a field named ErrorMessage that contains a brief reason for the error.
Error Handling
Clients retry on network and retriable errors. If throttled (ThrottleTimeMs > 0), the client waits before retrying. Fatal errors (e.g., UNSUPPORTED_VERSION, authentication failures) are not retried and should throw runtime exceptions.
Configuration support is performed on a best-effort basis. Failure to send the configuration should not prevent the client from functioning.
Proposed Changes
Disabling the Feature
This feature can be disabled on the broker and the client. For the broker, remove client.config.policy.class.name or set its value to null. When this configuration is missing, the ApiVersions response will not include support for the feature, so the client doesn’t send configuration. For the client, set enable.config.push to false, in which case the client skips the entire configuration handshake.
Broker Behavior
If a broker is configured with client.config.policy.class.name, the ApiVersions response advertises support for the PushConfig RPC. Whenever a client sends a PushConfig request, the broker calls the policy with the client configuration for observability.
Configuration Payload Size Enforcement
As described below, client implementations should not attempt to send a payload that is too large in the first place. But as a backup means of preventing the client from sending too much data, the broker checks the new configuration client.config.max.bytes prior to invoking the policy. If the size of the PushConfig request exceeds client.config.max.bytes, the broker returns the CONFIG_TOO_LARGE error to the client.
After analyzing the different Java clients’ configuration, a default of 10 KB for client.config.max.bytes provides more than sufficient capacity:
Client | Number of Non-sensitive Configuration Keys | Total Request Size |
|---|---|---|
| ~25 | < 1 KB |
| ~50 configs | < 2 KB |
| ~45 configs | < 2 KB |
| ~55 configs (without producer, consumer, admin) | ~2 KB |
A default 10 KB limit provides ~5x headroom for the largest expected use case, while preventing abuse from malicious or errant clients.
Excluding Sensitive Configuration
As explained in the concepts section, the ClientConfigPolicy implementation may also provide logic to ensure that clients do not send sensitive configuration. Clients across the Apache Kafka ecosystem do not have a consistent naming convention. As a result, brokers cannot determine sensitivity based on the configuration key name and rely on the incoming ConfigType field. When the ClientConfigPolicy detects sensitive configuration, it includes a description of the violation in the RPC response.
Metrics
The following new broker metrics are added:
Name | Type | Group | Tags | Notes |
|---|---|---|---|---|
| Gauge | client-config | The current number of unique client instance IDs. NOTE: this is from KIP-714 but will likely be refactored KIP-1313. | |
| Meter | client-config |
| The total number/rate of |
| Meter | client-config |
| The total number/rate of errors raised during preparation for and/or invoking the plugin’s |
| Avg and Max | client-config | client_instance_id | The length of time (in milliseconds) the broker spent invoking the plugin’s process() method. |
Client Behavior
Handshake
A client that supports this configuration interface will identify a node that supports the API using ApiVersions. The client performs a handshake by collecting the values for its configuration issuing a PushConfig RPC to submit the configuration to the broker node. The client sends the RPC after authentication (if any) and before the client starts to use the connection for requests. Similar to the ApiVersions handshake, the PushConfig RPC specifies a fixed timeout of default.timeout.ms. If the RPC exhausts its retries, the client logs the error, but continues execution.
Here’s the sequence:
Client connects to broker
Send
ApiVersionsrequest (internal, automatic)Receive
ApiVersionsresponse to determine which features broker supportsIf
enable.config.pushis set and configuration push is supported by the brokerCollect requested client configuration values
Send
PushConfigwith configurationReceive
PushConfigresponse
User requests can now be sent
The client chooses a randomly selected node for its configuration handshake, in the same way as GetTelemetrySubscriptions. The client configuration is only sent once, not for each broker. The “handshake” is performed on a per-client basis, not for each connection. As connections are closed due to disconnects or aging out, no new configuration handshake is performed. This operation is executed once for each distinct client instance.
The handshake is performed on a best-effort basis. Network or other transient errors that occur when transmitting the configuration data must not prevent the client from functioning.
Blocking Behavior
From the user’s perspective, the client blocks when a client API is invoked until the handshake completes.
For example:
// KafkaProducer constructor returns immediately KafkaProducer<String, String> producer = new KafkaProducer<>(props); // ✓ Non-blocking // First send() blocks until connection is ready (including configuration handshake) producer.send(record); // ← Blocks up to max.block.ms waiting for READY state
Excluding Sensitive Configuration
The configuration entries sent in the PushConfig request should exclude any sensitive information. Which configuration keys are considered sensitive is determined by the client library.
Including Configuration Data Types
Depending on the server-side implementation, preserving the data types enables more compact storage, reduces redundant conversion, reduce cognitive overhead in downstream use, and eliminate invalid data (e.g. storing a value of “hi!” in a boolean). It’s also important because the server is not aware of all the different clients and their respective configuration, so providing the name, type, and value, though at times redundant, allows for wider client compatibility.
Metrics
This KIP does not introduce any new client metrics.
Compatibility, Deprecation, and Migration Plan
Impact on Existing User
Broker
No policy configured: Feature is effectively disabled
No new RPCs are advertised in
ApiVersionsIf client sends requests, broker throws an error
No performance or behavioral impact
Policy configured: Feature is enabled
New RPCs are advertised in
ApiVersionsBroker receives and processes config pushes from supporting clients
Older clients (no support) are unaffected
Minimal resource overhead
Upgrade path: Rolling upgrade safe
Old brokers: don't advertise config push APIs, clients skip handshake
New brokers: advertise APIs, clients perform handshake if enabled
No cross-version issues
Client
Older clients (no support): No impact
Don't check for config push APIs
Behavior identical to pre-KIP
Newer clients with feature disabled: No impact
enable.config.push=falseHandshake skipped
Behaviorally identical to older clients
Newer clients with feature enabled (default): Minor impact
Additional RTT during connection setup (
PushConfig)Estimated 10-50ms added latency to first user request (depends on network RTT)
One-time cost per client instance lifetime
Migration
This is a purely additive feature and requires no migration.
Before KIP: No client configuration visibility
After KIP: Incremental visibility as clients upgrade
Breaking changes: n/a
Deprecation: n/a
Test Plan
Integration Tests
Happy Path
Test complete handshake with success
Client Startup
Test producer, consumer, admin, and Kafka Streams client with
enable.config.pushset totrueperforms handshakeTest producer, consumer, admin, and Kafka Streams client with
enable.config.pushset tofalseskip handshake entirelyTest Kafka Streams does not perform separate handshake for embedded producer/consumer/admin clients
Broker Configuration
Test broker with
client.config.policy.class.nameset advertises APIs inApiVersionsTest broker without policy (
null) does not advertise config push APIsTest broker with policy invokes
process()on successfulPushConfig
Mixed Broker Versions (Rolling Upgrade)
Test old brokers (no config push support) don't advertise APIs
Test new brokers advertise APIs
Test clients detect support via
ApiVersionsand only handshake with new brokersTest clients work correctly when connecting to mix of old and new brokers
Retries
Test client retries
PushConfigon retriable errorsTest client does not retry on
CONFIG_TOO_LARGEorINVALID_CONFIGTest exponential backoff is applied correctly
Timeout Handling
Test handshake respects
default.api.timeout.msTest client continues if handshake times out (best-effort feature)
Test timeout does not block subsequent operations
Throttling
Test client waits for
ThrottleTimeMsbefore retrying if throttled
System Tests
Multiple Client Types
Test Java
KafkaProducer,KafkaConsumer,AdminClient, andKafkaStreamsapplication and verify each client type sends appropriate configs based on the type of client
Large Payloads
Test config payload near
client.config.max.byteslimitTest config payload exceeding
client.config.max.bytesreturnsCONFIG_TOO_LARGE
Policy
Test custom
ClientConfigPolicyrejects configs viaInvalidConfigExceptionTest client receives
INVALID_CONFIGerror with an appropriate message
Rejected Alternatives
Exclusion of Default Values
The number of configuration entries is getting larger with each release, the vast majority of which use default settings. The question arises: how should we handle configuration entries that use a default value? Here are some options for the client:
Send all configuration entries to the cluster, including those with default values.
Omit any configuration entries that use their respective default values.
Send all configuration entries, but omit the value value and demarcate those that use default values.
It’s redundant to send configuration with known default values. Preparing and sending the name, type, and value for scores of configuration could add up to a couple of KB in network transit. Additionally, that then means that the server side node that receives the handshake request then has to handle requests that consist mostly of entries with default values. It’s easy to argue that the defaults are superfluous and not include them.
Whether or not to include configuration entries with default values somewhat depends on what the ClientConfigPolicy implementation plans to do with those entries. Also, keep in mind that the server receiving the handshakes may service many different clients from different languages and versions. Even though the client knows when the configuration entry’s value is the default value, the broker handling the handshake may have no idea that, for example, topic.compression.level=low from the 2.5.2 version of the Visual Basic Kafka client is the default value for that client. Requiring each implementor of ClientConfigPolicy to maintain a listing of all the default values across all available Kafka clients and their respective versions seems like overkill.
The stance of this KIP is that there is no need to exclude default configuration, based on the following:
The configuration handshake only occurs once during the lifetime of a client.
The configuration handshake request size is negligible compared with the amount of network traffic over the course of the lifetime of a client.
The server node handling the incoming configuration handshake can drop any entries that come in with a default value.
Requiring that broker know a priori the default values (to fill in the missing information) is a maintenance problem.
Including Configuration Storage Detail
Storage and retention of the configuration data is outside the scope of this KIP. The ClientConfigPolicy implementation is responsible for managing the storage, if any, of the configuration payload once the broker invokes it.
