DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: here
Vote thread: here
JIRA: KAFKA-19112
Motivation
In Kafka, some configurations allow users to specify one or more values as a comma-separated string. However, these configurations currently do not validate whether the input is empty or contains duplicate values, and the documentation does not clearly state whether empty values or null values are allowed.
This ambiguity can easily mislead users into thinking that providing an empty list or null is acceptable. Although duplicate values do not affect system behavior, they introduce semantic redundancy. To maintain clarity and avoid potential misconfigurations, empty lists should be disallowed for those configurations that cause the system to malfunction and duplicate values should be ignored unless they have well defined semantics.
In systems design, a fail-fast system is one that immediately reports at its interface any condition that is likely to indicate a failure. Fail-fast systems are usually designed to stop normal operation rather than attempt to continue a possibly flawed process.
To improve clarity and prevent configuration errors, we propose rejecting empty or null entries and ignoring duplicate values for the affected configurations. This approach aligns with the fail-fast principle: invalid inputs should be caught as early as possible ideally during configuration parsing rather than during runtime or server startup.
The following configurations will be updated to enforce this validation:
group.coordinator.rebalance.protocolsprocess.roles- etc...
Going forward, passing an empty list in the list to any of these configs will result in a ConfigException.
To support this change, we should also update the Kafka documentation to clearly state that these configurations do not allow empty values. Additionally, the system should throw a ConfigException during configuration parsing to immediately alert users to any invalid input.
We should also align the behavior of LIST-type and STRING-type configurations.
- Currently, there are three representations for an empty list default value: null, an empty string (""), and an empty list ([]). We should standardize all LIST-type configuration default values to use
List.of() - Additionally, some configurations are currently defined as String but use comma-separated values to represent lists. These should be updated to use the proper List-type instead of relying on string parsing.
For the cleanup.policy empty value, we defined a new meaning: it indicates infinite retention, which is equivalent to setting retention.ms=-1 and retention.bytes=-1.
- If cleanup.policy is empty and remote.storage.enable is set to true, local log segments will be cleaned based on the values of log.local.retention.bytes and log.local.retention.ms.
- If cleanup.policy is empty and remote.storage.enable is set to false, local log segments will not be deleted automatically. However, records can still be removed explicitly through deleteRecords API calls, which will advance the log start offset and delete the corresponding log segments.
Public Interfaces
org.apache.kafka.common.config.ConfigDef$ValidListorg.apache.kafka.common.config.TopicConfigorg.apache.kafka.common.config.SaslConfigsorg.apache.kafka.server.config.KRaftConfigsorg.apache.kafka.coordinator.group.GroupCoordinatorConfigorg.apache.kafka.tools.BrokerApiVersionsCommand$AdminClientorg.apache.kafka.connect.mirror.MirrorSourceTaskConfigorg.apache.kafka.connect.mirror.MirrorCheckpointTaskConfigorg.apache.kafka.connect.mirror.MirrorCheckpointConfigorg.apache.kafka.connect.runtime.rest.RestServerConfigorg.apache.kafka.connect.mirror.MirrorClientConfigorg.apache.kafka.server.config.ServerConfigsorg.apache.kafka.network.SocketServerConfigsorg.apache.kafka.server.config.ServerLogConfigsorg.apache.kafka.server.metrics.ClientMetricsConfigsorg.apache.kafka.server.metrics.MetricConfigsorg.apache.kafka.connect.transforms.ReplaceFieldorg.apache.kafka.connect.transforms.MaxkFieldorg.apache.kafka.connect.transforms.DropHeadersorg.apache.kafka.connect.transforms.ValueToKeyorg.apache.kafka.connect.runtime.SinkConnectorConfigorg.apache.kafka.connect.runtime.WorkerConfigorg.apache.kafka.connect.mirror.MirrorMakerConfigorg.apache.kafka.connect.mirror.MirrorConnectorConfigorg.apache.kafka.clients.consumer.ConsumerConfigorg.apache.kafka.clients.producer.ProducerConfigorg.apache.kafka.connect.mirror.MirrorSourceConfigorg.apache.kafka.connect.mirror.DefaultConfigPropertyFilterorg.apache.kafka.connect.mirror.DefaultTopicFilterorg.apache.kafka.common.config.internals.BrokerSecurityConfigs
Proposed Changes
We will make the following changes to ValidList :
- Introduce
in(String, boolean)public method: Create validators that can optionally reject empty lists while still enforcing valid string constraints. - Introduce
anyNonDuplicateValues()public method: Creates validators that explicitly reject value(null, empty list) by use setting - Update the
ensureValid()method to throw a ConfigException if the user provides duplicate values in the list, and also add logic to check null value
public static class ValidList implements Validator {
final ValidString validString;
final boolean isEmptyAllowed;
final boolean isNullAllowed;
private ValidList(List<String> validStrings, boolean isEmptyAllowed, boolean isNullAllowed) {
this.validString = new ValidString(validStrings);
this.isEmptyAllowed = isEmptyAllowed;
this.isNullAllowed = isNullAllowed;
}
public static ValidList anyNonDuplicateValues(boolean isEmptyAllowed, boolean isNullAllowed) {
return new ValidList(List.of(), isEmptyAllowed, isNullAllowed);
}
public static ValidList in(String... validStrings) {
return new ValidList(List.of(validStrings), true, false);
}
public static ValidList in(boolean isEmptyAllowed, String... validStrings) {
if (validStrings.length == 0) {
throw new IllegalArgumentException("Valid strings list cannot be empty for inNonEmpty validator");
}
return new ValidList(List.of(validStrings), isEmptyAllowed, false);
}
@Override
public void ensureValid(final String name, final Object value) {
if (value == null) {
if (isNullAllowed)
return;
else
throw new ConfigException("Configuration '" + name + "' values must not be null.");
}
@SuppressWarnings("unchecked")
List<Object> values = (List<Object>) value;
if (!isEmptyAllowed && values.isEmpty()) {
String validString = this.validString.validStrings.isEmpty() ? "any non-empty value" : this.validString.toString();
throw new ConfigException("Configuration '" + name + "' must not be empty. Valid values include: " + validString);
}
if (values.size() > 1 && Set.copyOf(values).size() != values.size()) {
throw new ConfigException("Configuration '" + name + "' values must not be duplicated.");
}
validateIndividualValues(name, values);
}
private void validateIndividualValues(String name, List<Object> values) {
boolean hasValidStrings = !validString.validStrings.isEmpty();
for (Object value : values) {
if (value instanceof String) {
String string = (String) value;
if (string.isEmpty()) {
throw new ConfigException("Configuration '" + name + "' values must not be empty.");
}
if (hasValidStrings) {
validString.ensureValid(name, value);
}
}
}
}
public String toString() {
return !validString.validStrings.isEmpty() ? validString.toString() : "";
}
}
The following configurations will be modified, If the value is not modified, a dash will be used instead.
| Component | Configuration Class | Config Name | Before Type | After Type | Before Default Value | After Default Value | Before Validator | After Validator | Before Valid Value | After Valid Value | Before Exception | After Exception |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Security | SaslConfigs | sasl.oauthbearer.expected.audience | - | - | null | List.of() | null | anyNonDuplicateValues | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException |
| SslConfigs | ssl.cipher.suites | - | - | null | List.of() | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | |
| ssl.enabled.protocols | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
| Broker/Controller | KRaftConfigs | process.roles | - | - | - | - | ValidList.in | in(isEmptyAllowed = false) | empty list [7], non-empty list, duplicate value | non-empty list | ConfigException (empty list, duplicate value) | ConfigException |
| controller.listener.names | String | List | null | NO_DEFAULT_VALUE (Object) | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = false) | null, empty list [8], non-empty list, duplicate value | non-empty list | IllegalArgumentException (empty list, null) | ConfigException | ||
| LogConfig#SERVER_CONFIG_DEF | log.cleanup.policy | - | - | - | - | ValidList.in | in(isEmptyAllowed = true) | empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | |
| LogConfig#CONFIG_DEF | cleanup.policy [1] | - | - | - | - | ValidList.in | in(isEmptyAllowed = true) | empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | |
BrokerSecurityConfigs | ssl.cipher.suites | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | |
sasl.enabled.mechanisms | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
sasl.kerberos.principal.to.local.rules | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
sasl.oauthbearer.expected.audience | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
| RestServerConfig | rest.extension.classes | - | - | "" | List.of() | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | |
ServerConfigs | early.start.listeners [2] | String | List | - | - | null | anyNonDuplicateValues (isNullAllowed = true, isEmptyAllowed = true | null, empty list, non-empty list, duplicate value | null, | none | ConfigException | |
config.providers | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
SocketServerConfigs | listeners | String | List | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = false) | null, empty list [9], non-empty list, duplicate value | non-empty list | none | ConfigException | |
advertised.listeners | String | List | - | - | null | anyNonDuplicateValues (isNullAllowed = true, isEmptyAllowed = false) | null, empty list [10], non-empty list, duplicate value | null, non-empty list | IllegalArgumentException (broker empty list) | ConfigException | ||
ServerLogConfigs | log.dirs [3] | String | List | - | - | null | anyNonDuplicateValues (isNullAllowed = true, isEmptyAllowed = false) | null, empty list [11], non-empty list, duplicate value | null, non-empty list | IllegalArgumentException (empty list) | ConfigException | |
| log.dir [4] | String | List | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = false) | null, empty list [12], non-empty list, duplicate value | non-empty list | none | ConfigException | ||
ClientMetricsConfigs | metrics | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | NullPointerException(null) | ConfigException | |
match | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
MetricConfigs | metric.reporters | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | |
kafka.metrics.reporters | - | - | "" | List.of() | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | NullPointerException(null) | ConfigException | ||
| GroupCoordinatorConfig | group.consumer.assignors | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = false) | null, empty list, non-empty list, duplicate value | non-empty list | IndexOutOfBoundsException (empty list), | ConfigException | |
group.share.assignors | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = false) | null, empty list, non-empty list, duplicate value | non-empty list | IllegalArgumentException (empty list), | ConfigException | ||
| group.coordinator.rebalance.protocols | - | - | - | - | ValidList.in | in(isEmptyAllowed = false) | empty list [13], non-empty list, duplicate value | non-empty list | none | ConfigException | ||
Connect | ReplaceField | exclude | - | - | - | - | null | anyNonDuplicateValues | null, empty list, non-empty list, duplicate value | empty list, non-empty list | NullPointerException(null) | ConfigException |
include | - | - | - | - | null | anyNonDuplicateValues | null, empty list, non-empty list, duplicate value | empty list, non-empty list | NullPointerException(null) | ConfigException | ||
MaskField | fields | - | - | - | - | NonEmptyListValidator | anyNonDuplicateValues | non-empty list, duplicate value | non-empty lis | ConfigException(null, empty list) | ConfigException | |
DropHeaders | headers | - | - | - | - | NonEmptyListValidator | anyNonDuplicateValues | non-empty list, duplicate value | non-empty list | ConfigException(null, empty list) | ConfigException | |
ValueToKey | fields | - | - | - | - | NonEmptyListValidator | anyNonDuplicateValues | non-empty list, duplicate value | non-empty list | ConfigException(null, empty list) | ConfigException | |
SinkConnectorConfig | topics | - | - | "" | List.of() | null | anyNonDuplicateValues | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | |
WorkerConfig | metric.reporters | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | |
config.providers | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
bootstrap.servers | - | - | localhost://:9092 [6] | NO_DEFAULT_VALUE (Object) | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = false) | null, empty list [14], non-empty list, duplicate value | non-empty list | none | ConfigException | ||
plugin.path [5] | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = true, isEmptyAllowed = false) | null, empty list [15], non-empty list, duplicate value | null, non-empty list | none | ConfigException | ||
| MirrorMaker | MirrorClientConfig | bootstrap.servers | - | - | null | NO_DEFAULT_VALUE (Object) | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = false) | null, empty list [16], non-empty list, duplicate value | non-empty list | none | ConfigException |
| MirrorSourceTaskConfig | task.assigned.partitions | - | - | null | NO_DEFAULT_VALUE (Object) | - | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = false) | null, empty list [17], non-empty list, duplicate value | non-empty list | none | ConfigException | |
| MirrorCheckpointTaskConfig | task.assigned.groups | - | - | null | NO_DEFAULT_VALUE (Object) | - | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = false) | null, empty list [18], non-empty list, duplicate value | non-empty list | none | ConfigException | |
| MirrorCheckpointConfig | groups | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | |
groups.exclude | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
MirrorMakerConfig | config.providers | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | |
clusters | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
MirrorConnectorConfig | metric.reporters | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | |
config.providers | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
MirrorSourceConfig | topics | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | |
topics.exclude | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
config.properties.exclude | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
DefaultConfigPropertyFilter | config.properties.exclude | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | |
DefaultTopicFilter | topics | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | |
topics.exclude | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
| Admin | AdminClientConfig | bootstrap.servers | - | - | "" | List.of() | - | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | ConfigException(both bootstrap.servers and bootstrap.controllers are empty or null) | ConfigException |
| bootstrap.controllers | - | - | "" | List.of() | - | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | ConfigException(both bootstrap.servers and bootstrap.controllers are empty or null) | ConfigException | ||
metric.reporters | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
config.providers | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
| BrokerApiVersionsCommand$AdminClient | bootstrap.servers | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = false) | null, empty list, non-empty list, duplicate value | non-empty list | ConfigException(empty list), NullPointerException(null) | ConfigException | |
Consumer | ConsumerConfig | metric.reporters | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException |
config.providers | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
bootstrap.servers | - | - | List.of() | NO_DEFAULT_VALUE (Object) | NonNullValidator | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = false) | empty list [19], non-empty list, duplicate value | non-empty list | ConfigException(null) | ConfigException | ||
interceptor.classes | - | - | - | - | NonNullValidator | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | empty list, non-empty list, duplicate value | empty list, non-empty list | ConfigException(null) | ConfigException | ||
partition.assignment.strategy | - | - | - | - | NonNullValidator | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | empty list, non-empty list, duplicate value | empty list, non-empty list | ConfigException(null), IllegalStateException( empty list) | ConfigException | ||
Producer | ProducerConfig | metric.reporters | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException |
config.providers | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
bootstrap.servers | - | - | List.of() | NO_DEFAULT_VALUE (Object) | NonNullValidator | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = false) | empty list [20], non-empty list, duplicate value | non-empty list | ConfigException(null) | ConfigException | ||
interceptor.classes | - | - | - | - | NonNullValidator | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | empty list, non-empty list, duplicate value | empty list, non-empty list | ConfigException(null) | ConfigException | ||
| Streams | StreamsConfig | metric.reporters | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException |
config.providers | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = true) | null, empty list, non-empty list, duplicate value | empty list, non-empty list | none | ConfigException | ||
bootstrap.servers | - | - | - | - | null | anyNonDuplicateValues (isNullAllowed = false, isEmptyAllowed = false) | null, empty list [21], non-empty list, duplicate value | non-empty list | ConfigException(empty list) | ConfigException |
Note:
If cleanup.policy is empty and remote.storage.enable is set to true, the local log segments will be cleaned based on the values of log.local.retention.bytes and log.local.retention.ms.
- For early.start.listeners, this configuration is optional (users are not required to set it). This means the field can be omitted or set to an empty list. However, duplicate values will still be rejected regardless of these settings.
- For log.dirs, this configuration is also optional (users are not required to set it). However when provided, it must be a non-empty list with no duplicate values. A null value is acceptable, but an empty list will be rejected.
- For log.dir, this configuration is also optional (users are not required to set it). However when provided, it must be a non-empty list with no duplicate values. A null value is acceptable, but an empty list will be rejected
- For plugin.path, this configuration is also optional (users are not required to set it). However when provided, it must be a non-empty list with no duplicate values. A null value is acceptable, but an empty list will be rejected.
- Currently, the default value of bootstrap.servers is "localhost:9092". However, this is somewhat odd, as the default value should ideally be useful, and "localhost:9092" is not useful in most scenarios.
- Currently, process.roles allows an empty value during the storage format validation stage, but it will fail during server startup. Since the Kafka node relies on this configuration to determine its role, it is a required setting and should not be left empty.
- Currently, if controller.listener.names is set to an empty list or null, it fails for different reasons depending on the node role. A broker node will return the error: "controller.listener.names must contain at least one value when running KRaft with just the broker role", while a controller node will encounter validation errors when attempting to match the listeners configuration
- Currently, if listeners is set to an empty list, which means no network interfaces and protocols that a Kafka node listens on.
- Currently, if advertised.listeners is set to an empty string, an exception is thrown only when the node is running as a broker. In other scenarios, no exception is raised. However, this configuration still results in the node not advertising any listener addresses, making it unreachable to clients.
- Currently, if log.dirs is set to an empty list, and log.dir is empty, Kafka will fail during the storage format phase with an error stating that "At least one log directory must be defined via log.dirs or log.dir".
- Currently, if log.dir is set to an empty list, and log.dirs is empty, Kafka will fail during the storage format phase with an error stating that "At least one log directory must be defined via log.dirs or log.dir".
- Currently, if group.coordinator.rebalance.protocols is set to an empty list, the server cannot determine which rebalance protocol to use for the consumer group.
- Currently, if bootstrap.servers is set to an empty list, It will fail to initialize KafkaAdminClient when lookup Kafka cluster id.
- Currently, if plugin.path is set to an empty list, no user-specified plugin locations are processed, but plugins from the parent classloader are still loaded, which may go against the user's explicit intention, as they might expect that no plugins would be loaded at all. In contrast, if plugin.path is not set (null), it's reasonable to load plugins from the parent classloader, as the user is likely relying on the default behavior
- Currently, if bootstrap.servers is set to an empty list, It will fail to initialize ForwardingAdmin.
Currently, if task.assigned.partitions is set to an empty list, which means no MirrorSourceTask can be created.
- Currently, if task.assigned.groups is set to an empty list, which means no Checkpoint can be created.
- Currently, if bootstrap.servers is set to an empty list, it will fail to construct a consumer because there are no resolvable bootstrap URLs.
- Currently, if bootstrap.servers is set to an empty list, it will fail to construct a producer because there are no resolvable bootstrap URLs.
- Currently, if bootstrap.servers is set to an empty list, It will fail to initialize KafkaAdminClient and consumer.
Compatibility, Deprecation, and Migration Plan
We add the three main changes in this KIP
- Disallowing null values for most LIST-type configurations makes sense, since users cannot explicitly set a configuration to null in a properties file. Therefore, only configurations with a default value of null should be allowed to accept null.
Ignoring duplicate values is reasonable, as less Kafka configuration currently requires repeating the same value, and allowing duplicates can confuse users. Therefore, most LIST-type configurations no longer accept duplicates unless explicitly supported. For backward compatibility, unsupported duplicate entries will be ignored and a warning will be logged.
- Disallowing empty list, even though many configurations currently accept them. In practice, setting an empty list for several of these configurations can lead to server startup failures or unexpected behavior. Therefore, enforcing non-empty lists helps prevent misconfiguration and improves system robustness.
These changes may introduce some backward incompatibility, but this trade-off is justified by the significant improvements in safety, consistency, and overall user experience.
Additionally, we introduce two minor adjustments:
- Reclassify some STRING-type configurations as LIST-type, particularly those using comma-separated values to represent multiple entries. This change reflects the actual semantics used in Kafka.
- Update the default values for some configurations to better align with other configs.
These changes will not introduce any compatibility issues.
The migration plan.
- Introduce new methods
in(),anyNonDuplicateValues()inValidList - Update the usage of all configurations to follow the validation rules defined above.
Test Plan
- Existing test should be all passing
- Add new test for
ValidList#anyNonDuplicateValues,ValidList#in
Rejected Alternatives
Using new cleanup.policy type 'none'
. This approach was rejected for the following reasons:
- Infinite retention is already achievable by setting log.retention.ms = -1 and log.retention.bytes = -1.
- Introducing a new type could introduce unnecessary complexity and diverge from the existing retention semantics.
- Allowing an empty list for cleanup.policy (which implies no compaction or deletion) is more consistent with the general configuration model for list-based configs.