DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Under Discussion
Discussion thread: https://lists.apache.org/thread/vlqvcyl5j95t7w9rk3nnz09d0050yzh9
JIRA:
KAFKA-10043
-
Getting issue details...
STATUS
Motivation
Kafka allows users to configure how tool scripts behave through multiple methods:
- Command line arguments
- Properties passed as key=value pairs via command line (Command line properties)
- Configuration files
However, there is no standard, consistent way to determine which source takes precedence when multiple sources are provided. This inconsistency can confuse users and make the system more error-prone.
This KIP aims to establish a clear and uniform configuration precedence across kafka tool scripts.
Furthermore, the validation of required arguments, such as --bootstrap-server and --bootstrap-controller, should consider multiple sources. For most existing tool scripts, these configurations are validated only by checking if they are provided through command line arguments. However, they could also be specified through command line properties or configuration files.
Public Interfaces
Proposed Configuration Precedence
1) The proposed precedence order for Kafka tool scripts is as follows:
- Command line arguments
- Properties passed as key=value pairs via command line (Command line properties)
- Configuration files
- Default values set by tool scripts and default values of command line arguments
- Default values set by Kafka components
In addition, a warning is issued when the same configuration appears in multiple sources (command line, command line properties, or configuration file), which is helpful for troubleshooting if users did not intend to specify the same configuration multiple times.
Note: property or map entries may have empty string or null values, as some configurations allow them, so the value is used verbatim.
2) A new option is added for all affected tool scripts not following the proposed precedence:
- Name: modern
- Description: This configuration determines whether to enable the configuration precedence proposed in this KIP. It will be deprecated in Kafka 5.0, after which the default behavior will always follow the approach proposed by this KIP. It will then be removed in Kafka 6.0.
- Type: boolean
- Default: false
Validating Required Arguments from Multiple Sources
Required arguments (e.g., --bootstrap-server and --bootstrap-controller for now) should be validated after considering values from the following sources:
- Command line arguments
- Properties passed as key=value pairs via command line (Command line properties)
- Configuration files
An error should only be raised if the arguments are missing or invalid after checking all three sources.
Proposed Changes
To ensure that all tool scripts correctly honor the proposed configuration precedence, introduce the helper methods in CommandLineUtils.java and enforce their use across all affected tools. This prevents each script from implementing its own logic and ensures consistent behavior.
Since the codebase currently uses three different argument parsers (joptsimple, argparse4j, and AbstractConnectCli), the helper methods are designed to be simple and generic, without depending on any specific external library.
Helper methods
The following code demonstrates the key helper methods:
/**
* Merge multiple configuration sources according to priority, from highest to lowest:
* 1) Command line arguments
* 2) Properties passed as key=value pairs via command line (Command line properties)
* 3) Configuration files
* 4) Default values set by tool scripts and default values of command line arguments
*
* @param commandLineMap Map of configuration from command line arguments
* @param commandLineKeyValMap Map of configuration from key=value pairs passed via command line (command line properties)
* @param configMap Map of configuration from configuration files
* @param toolDefaultMap Map of default values provided by tool scripts or default command line options
* @return a merged Map of all configuration sources, where higher-priority sources override lower-priority ones
*/
public static Map<String, Object> mergePropertiesWithPrecedence(
Map<String, Object> commandLineMap,
Map<String, Object> commandLineKeyValMap,
Map<String, Object> configMap,
Map<String, Object> toolDefaultMap
) {
warnConfigFromMultipleSources(commandLineMap, commandLineKeyValMap, configMap);
Map<String, Object> map = new HashMap<>();
// Default values set by tool scripts and default values of command line arguments
if (toolDefaultMap != null) {
map.putAll(toolDefaultMap);
}
// Configuration file
if (configMap != null) {
map.putAll(configMap);
}
// Properties passed as key=value pairs via command line (Command line properties)
if (commandLineKeyValMap != null) {
map.putAll(commandLineKeyValMap);
}
// Command line arguments
if (commandLineMap != null) {
map.putAll(commandLineMap);
}
return map;
}
/**
* Print warnings when the same configurations is defined in multiple sources.
* - Command line arguments override command property values and configuration files.
* - Command property values override configuration files if no command line argument exists.
*
* @param commandLineMap Map of configuration from command line arguments
* @param commandLineKeyValMap Map of configuration from command line properties
* @param configMap Map of configuration loaded from configuration files
*/
public static void warnConfigFromMultipleSources(
Map<String, Object> commandLineMap,
Map<String, Object> commandLineKeyValMap,
Map<String, Object> configMap
) {
// Command line argument overrides command property and config file
for (Map.Entry<String, Object> entry : commandLineMap.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
StringBuilder warning = new StringBuilder();
if (commandLineKeyValMap.containsKey(key) && configMap.containsKey(key)) {
warning.append(key).append("=").append(value)
.append(" from command line argument overrides command property value (")
.append(commandLineKeyValMap.get(key)).append(")")
.append(" and configuration file value (")
.append(configMap.get(key)).append(").");
} else if (commandLineKeyValMap.containsKey(key)) {
warning.append(key).append("=").append(value)
.append(" from command line argument overrides command property value (")
.append(commandLineKeyValMap.get(key)).append(").");
} else if (configMap.containsKey(key)) {
warning.append(key).append("=").append(value)
.append(" from command line overrides configuration file value (")
.append(configMap.get(key)).append(").");
}
if (!warning.isEmpty()) {
System.out.println("Warning: " + warning);
}
}
// Command property override config file
for (Map.Entry<String, Object> entry : commandLineKeyValMap.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (commandLineMap.containsKey(key)) continue;
if (configMap.containsKey(key)) {
System.out.println("Warning: " + key + "=" + value
+ " from command property overrides configuration file value ("
+ configMap.get(key) + ").");
}
}
}
// joptsimple-specific helper method
public static <T> void putIfOption(OptionSet options, OptionSpec<T> spec, Map<String, Object> map, String key) {
if (options.has(spec)) {
T value = options.valueOf(spec);
map.put(key, value);
}
}
// joptsimple-specific helper method
public static <T> void putOption(OptionSet options, OptionSpec<T> spec, Map<String, Object> map, String key) {
T value = options.valueOf(spec);
map.put(key, value);
}
Example usage
Map<String, Object> readerProps() throws IOException {
Map<String, Object> commandLineMap = new HashMap<>();
putOption(options, topicOpt, commandLineMap, "topic");
Map<String, Object> configMap = new HashMap<>();
if (options.has(readerConfigOpt)) {
configMap.putAll(propsToStringMap(loadProps(options.valueOf(readerConfigOpt))));
}
Map<String, Object> commandLineKeyValMap = new HashMap<>();
if (options.has(readerPropertyOpt)) {
commandLineKeyValMap.putAll(propsToStringMap(parseKeyValueArgs(options.valuesOf(readerPropertyOpt))));
}
return mergePropertiesWithPrecedence(commandLineMap, commandLineKeyValMap, configMap, null);
}
Map<String, Object> producerProps() throws IOException {
// Prepare the map from command line arguments
Map<String, Object> commandLineMap = new HashMap<>();
putIfOption(options, bootstrapServerOpt, commandLineMap, BOOTSTRAP_SERVERS_CONFIG);
commandLineMap.put(COMPRESSION_TYPE_CONFIG, compressionCodec());
putIfOption(options, sendTimeoutOpt, commandLineMap, LINGER_MS_CONFIG);
putIfOption(options, requestRequiredAcksOpt, commandLineMap, ACKS_CONFIG);
putIfOption(options, requestTimeoutMsOpt, commandLineMap, REQUEST_TIMEOUT_MS_CONFIG);
putIfOption(options, messageSendMaxRetriesOpt, commandLineMap, RETRIES_CONFIG);
putIfOption(options, retryBackoffMsOpt, commandLineMap, RETRY_BACKOFF_MS_CONFIG);
putIfOption(options, socketBufferSizeOpt, commandLineMap, SEND_BUFFER_CONFIG);
putIfOption(options, maxMemoryBytesOpt, commandLineMap, BUFFER_MEMORY_CONFIG);
// We currently have 2 options to set the batch.size value. We'll deprecate/remove one of them in KIP-717.
putIfOption(options, batchSizeOpt, commandLineMap, BATCH_SIZE_CONFIG);
putIfOption(options, maxPartitionMemoryBytesOpt, commandLineMap, BATCH_SIZE_CONFIG);
putIfOption(options, metadataExpiryMsOpt, commandLineMap, METADATA_MAX_AGE_CONFIG);
putIfOption(options, maxBlockMsOpt, commandLineMap, MAX_BLOCK_MS_CONFIG);
// Properties passed as key=value pairs via command line (Command line properties)
Map<String, Object> commandLineKeyValMap = new HashMap<>();
if (options.has(commandPropertyOpt)) {
commandLineKeyValMap.putAll(Utils.propsToStringMap(
parseKeyValueArgs(options.valuesOf(commandPropertyOpt))
));
}
// Configuration files
Map<String, Object> configMap = new HashMap<>();
if (options.has(commandConfigOpt)) {
configMap.putAll(Utils.propsToStringMap(
Utils.loadProps(options.valueOf(commandConfigOpt))
));
}
// Default values set by tool scripts and default values of command line arguments
Map<String, Object> toolDefaultMap = new HashMap<>();
toolDefaultMap.put(CLIENT_ID_CONFIG, "console-producer");
toolDefaultMap.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
toolDefaultMap.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
// Since all the options below have default values, we don't need to check for null
putOption(options, sendTimeoutOpt, toolDefaultMap, LINGER_MS_CONFIG);
putOption(options, requestRequiredAcksOpt, toolDefaultMap, ACKS_CONFIG);
putOption(options, requestTimeoutMsOpt, toolDefaultMap, REQUEST_TIMEOUT_MS_CONFIG);
putOption(options, messageSendMaxRetriesOpt, toolDefaultMap, RETRIES_CONFIG);
putOption(options, retryBackoffMsOpt, toolDefaultMap, RETRY_BACKOFF_MS_CONFIG);
putOption(options, socketBufferSizeOpt, toolDefaultMap, SEND_BUFFER_CONFIG);
putOption(options, maxMemoryBytesOpt, toolDefaultMap, BUFFER_MEMORY_CONFIG);
putOption(options, batchSizeOpt, toolDefaultMap, BATCH_SIZE_CONFIG);
putOption(options, maxPartitionMemoryBytesOpt, toolDefaultMap, BATCH_SIZE_CONFIG);
putOption(options, metadataExpiryMsOpt, toolDefaultMap, METADATA_MAX_AGE_CONFIG);
putOption(options, maxBlockMsOpt, toolDefaultMap, MAX_BLOCK_MS_CONFIG);
return mergePropertiesWithPrecedence(commandLineMap, commandLineKeyValMap, configMap, toolDefaultMap);
}
Required Updates to Existing Tools
The following table lists the tools that need to be updated based on the proposed changes.
All of the tools listed below should use the helper methods to ensure they honor the proposed precedence.
| Tools | Need New "modern" Option and List properties that do not follow the proposed precedence | Validating Required Arguments from Multiple Sources | Previous Jira/Email Discussion | Note |
|---|---|---|---|---|
| kafka-acls.sh | --bootstrap-server --bootstrap-controller | |||
kafka-broker-api-versions.sh | --bootstrap-server | |||
| kafka-client-metrics.sh | --bootstrap-server | |||
| kafka-cluster.sh | --bootstrap-server --bootstrap-controller | |||
| kafka-configs.sh | --bootstrap-server --bootstrap-controller | |||
| kafka-console-consumer.sh | For formatter: KEY_DESERIALIZER_CLASS_CONFIG VALUE_DESERIALIZER_CLASS_CONFIG | --bootstrap-server --from-beginning (AUTO_OFFSET_RESET_CONFIG) --group (GROUP_ID_CONFIG) is validated but does not follow the proposed validation flow | ||
| kafka-console-producer.sh | KEY_SERIALIZER_CLASS_CONFIG VALUE_SERIALIZER_CLASS_CONFIG COMPRESSION_TYPE_CONFIG | --bootstrap-server |
KAFKA-2526
-
Getting issue details...
STATUS
| |
| kafka-console-share-consumer.sh | For formatter: KEY_DESERIALIZER_CLASS_CONFIG VALUE_DESERIALIZER_CLASS_CONFIG | --bootstrap-server --group (GROUP_ID_CONFIG) is validated but does not follow the proposed validation flow | ||
| kafka-consumer-groups.sh | --bootstrap-server | |||
| kafka-consumer-perf-test.sh | GROUP_ID_CONFIG RECEIVE_BUFFER_CONFIG MAX_PARTITION_FETCH_BYTES_CONFIG AUTO_OFFSET_RESET_CONFIG KEY_DESERIALIZER_CLASS_CONFIG VALUE_DESERIALIZER_CLASS_CONFIG CHECK_CRCS_CONFIG | --bootstrap-server | KAFKA-10043 - Getting issue details... STATUS | |
| kafka-delegation-tokens.sh | --bootstrap-server | |||
| kafka-delete-records.sh | --bootstrap-server | |||
kafka-e2e-latency.sh | For consumer: GROUP_ID_CONFIG ENABLE_AUTO_COMMIT_CONFIG AUTO_OFFSET_RESET_CONFIG KEY_DESERIALIZER_CLASS_CONFIG VALUE_DESERIALIZER_CLASS_CONFIG FETCH_MAX_WAIT_MS_CONFIG For producer: LINGER_MS_CONFIG MAX_BLOCK_MS_CONFIG ACKS_CONFIG KEY_SERIALIZER_CLASS_CONFIG VALUE_SERIALIZER_CLASS_CONFIG | --bootstrap-server --producer-acks (ACKS_CONFIG) | ||
| kafka-features.sh | --bootstrap-server --bootstrap-controller | |||
kafka-get-offsets.sh | CLIENT_ID_CONFIG | --bootstrap-server | ||
kafka-groups.sh | --bootstrap-server | |||
kafka-leader-election.sh | --bootstrap-server | |||
kafka-log-dirs.sh | --bootstrap-server | |||
kafka-metadata-quorum.sh | --bootstrap-server --bootstrap-controller | |||
kafka-producer-perf-test.sh | KEY_SERIALIZER_CLASS_CONFIG VALUE_SERIALIZER_CLASS_CONFIG | |||
kafka-reassign-partitions.sh | --bootstrap-server --bootstrap-controller | |||
kafka-share-consumer-perf-test.sh | GROUP_ID_CONFIG RECEIVE_BUFFER_CONFIG MAX_PARTITION_FETCH_BYTES_CONFIG AUTO_OFFSET_RESET_CONFIG KEY_DESERIALIZER_CLASS_CONFIG VALUE_DESERIALIZER_CLASS_CONFIG CHECK_CRCS_CONFIG | --bootstrap-server | ||
kafka-share-groups.sh | --bootstrap-server | |||
kafka-streams-application-reset.sh | --bootstrap-server | This tool script is the only one with a default bootstrap-server; consider aligning it with the others. | ||
kafka-streams-groups.sh | --bootstrap-server | |||
kafka-topics.sh | --bootstrap-server | |||
kafka-transactions.sh | --bootstrap-server | |||
kafka-verifiable-consumer.sh | GROUP_PROTOCOL_CONFIG GROUP_REMOTE_ASSIGNOR_CONFIG PARTITION_ASSIGNMENT_STRATEGY_CONFIG --group-id (GROUP_ID_CONFIG) ENABLE_AUTO_COMMIT_CONFIG AUTO_OFFSET_RESET_CONFIG | --bootstrap-server --group-id (GROUP_ID_CONFIG) | ||
kafka-verifiable-producer.sh | KEY_SERIALIZER_CLASS_CONFIG VALUE_SERIALIZER_CLASS_CONFIG ACKS_CONFIG RETRIES_CONFIG | --bootstrap-server | ||
kafka-verifiable-share-consumer.sh | --bootstrap-server --group-id (GROUP_ID_CONFIG) |
Compatibility, Deprecation, and Migration Plan
Impact: Existing users who have configured their settings based on the current tool implementations, rather than the proposed precedence, may be affected.
Deprecation: Adjusting configuration precedence might break existing users. For this reason, we have an option added to each affected tool script to enable or disable this change. The plan is to always use the proposed precedence in the next major release, Kafka 5.0. Concurrently, the option will be deprecated in Kafka 5.0 and subsequently removed in Kafka 6.0.
Test Plan
- Unit tests for helper methods.
- Unit tests will be added to ensure that the properties are honored according to the precedence order as proposed.
- Unit tests for validating required arguments from multiple sources.
Rejected Alternatives
1. Deprecate the "modern" option during the proposed rollout and remove it in the next major release, Kafka 5.0
Reason for Rejection: Users who have enabled the modern option in scripts will face fatal errors once Kafka 5.0 is released.
2. Preferring Configuration Files Over Command Line Arguments
Reason for Rejection: Intuitively, users are more likely to expect properties set via command line arguments to take precedence over those defined in configuration files.
3. Allow Each Tool to Have Its Own Configuration Precedence
Reason for Rejection: While this would provide more flexibility for Kafka developers, it would introduce undesirable complexity to the configuration logic, be confusing to users, and increase the potential for bugs.