Status

Current state: Under Discussion

Discussion thread: https://lists.apache.org/thread/vlqvcyl5j95t7w9rk3nnz09d0050yzh9

JIRA: KAFKA-10043 - Getting issue details... STATUS

Motivation

Kafka allows users to configure how tool scripts behave through multiple methods:

  • Command line arguments
  • Properties passed as key=value pairs via command line (Command line properties)
  • Configuration files

However, there is no standard, consistent way to determine which source takes precedence when multiple sources are provided. This inconsistency can confuse users and make the system more error-prone.

This KIP aims to establish a clear and uniform configuration precedence across kafka tool scripts.

Furthermore, the validation of required arguments, such as --bootstrap-server and --bootstrap-controller, should consider multiple sources. For most existing tool scripts, these configurations are validated only by checking if they are provided through command line arguments. However, they could also be specified through command line properties or configuration files.

Public Interfaces

Proposed Configuration Precedence

1) The proposed precedence order for Kafka tool scripts is as follows:

  1. Command line arguments
  2. Properties passed as key=value pairs via command line (Command line properties)
  3. Configuration files
  4. Default values set by tool scripts and default values of command line arguments
  5. Default values set by Kafka components

In addition, a warning is issued when the same configuration appears in multiple sources (command line, command line properties, or configuration file), which is helpful for troubleshooting if users did not intend to specify the same configuration multiple times.

Note: property or map entries may have empty string or null values, as some configurations allow them, so the value is used verbatim.

2) A new option is added for all affected tool scripts not following the proposed precedence:

  • Name: modern
  • Description: This configuration determines whether to enable the configuration precedence proposed in this KIP. It will be deprecated in Kafka 5.0, after which the default behavior will always follow the approach proposed by this KIP. It will then be removed in Kafka 6.0.
  • Type: boolean
  • Default: false

Validating Required Arguments from Multiple Sources

Required arguments (e.g., --bootstrap-server and --bootstrap-controller for now) should be validated after considering values from the following sources:

  1. Command line arguments
  2. Properties passed as key=value pairs via command line (Command line properties)
  3. Configuration files

An error should only be raised if the arguments are missing or invalid after checking all three sources.

Proposed Changes

To ensure that all tool scripts correctly honor the proposed configuration precedence, introduce the helper methods in CommandLineUtils.java and enforce their use across all affected tools. This prevents each script from implementing its own logic and ensures consistent behavior.

Since the codebase currently uses three different argument parsers (joptsimple, argparse4j, and AbstractConnectCli), the helper methods are designed to be simple and generic, without depending on any specific external library.

Helper methods

The following code demonstrates the key helper methods:

CommandLineUtils.java
    /**
     * Merge multiple configuration sources according to priority, from highest to lowest:
     * 1) Command line arguments
     * 2) Properties passed as key=value pairs via command line (Command line properties)
     * 3) Configuration files
     * 4) Default values set by tool scripts and default values of command line arguments
     *
     * @param commandLineMap       Map of configuration from command line arguments
     * @param commandLineKeyValMap Map of configuration from key=value pairs passed via command line (command line properties)
     * @param configMap            Map of configuration from configuration files
     * @param toolDefaultMap       Map of default values provided by tool scripts or default command line options
     * @return a merged Map of all configuration sources, where higher-priority sources override lower-priority ones
     */
	public static Map<String, Object> mergePropertiesWithPrecedence(
        Map<String, Object> commandLineMap,
        Map<String, Object> commandLineKeyValMap,
        Map<String, Object> configMap,
        Map<String, Object> toolDefaultMap
    ) {
		warnConfigFromMultipleSources(commandLineMap, commandLineKeyValMap, configMap);
        
		Map<String, Object> map = new HashMap<>();
        // Default values set by tool scripts and default values of command line arguments
        if (toolDefaultMap != null) {
            map.putAll(toolDefaultMap);
        }
        // Configuration file
        if (configMap != null) {
            map.putAll(configMap);
        }
        // Properties passed as key=value pairs via command line (Command line properties)
        if (commandLineKeyValMap != null) {
            map.putAll(commandLineKeyValMap);
        }
        // Command line arguments
        if (commandLineMap != null) {
            map.putAll(commandLineMap);
        }
        return map;
    }

    /**
     * Print warnings when the same configurations is defined in multiple sources.
     * - Command line arguments override command property values and configuration files.
     * - Command property values override configuration files if no command line argument exists.
     *
     * @param commandLineMap       Map of configuration from command line arguments
     * @param commandLineKeyValMap Map of configuration from command line properties
     * @param configMap            Map of configuration loaded from configuration files
     */
    public static void warnConfigFromMultipleSources(
        Map<String, Object> commandLineMap,
        Map<String, Object> commandLineKeyValMap,
        Map<String, Object> configMap
    ) {
        // Command line argument overrides command property and config file
        for (Map.Entry<String, Object> entry : commandLineMap.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            StringBuilder warning = new StringBuilder();

            if (commandLineKeyValMap.containsKey(key) && configMap.containsKey(key)) {
                warning.append(key).append("=").append(value)
                    .append(" from command line argument overrides command property value (")
                    .append(commandLineKeyValMap.get(key)).append(")")
                    .append(" and configuration file value (")
                    .append(configMap.get(key)).append(").");
            } else if (commandLineKeyValMap.containsKey(key)) {
                warning.append(key).append("=").append(value)
                    .append(" from command line argument overrides command property value (")
                    .append(commandLineKeyValMap.get(key)).append(").");
            } else if (configMap.containsKey(key)) {
                warning.append(key).append("=").append(value)
                    .append(" from command line overrides configuration file value (")
                    .append(configMap.get(key)).append(").");
            }

            if (!warning.isEmpty()) {
                System.out.println("Warning: " + warning);
            }
        }

        // Command property override config file
        for (Map.Entry<String, Object> entry : commandLineKeyValMap.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            if (commandLineMap.containsKey(key)) continue;

            if (configMap.containsKey(key)) {
                System.out.println("Warning: " + key + "=" + value
                    + " from command property overrides configuration file value ("
                    + configMap.get(key) + ").");
            }
        }
    }

	// joptsimple-specific helper method
    public static <T> void putIfOption(OptionSet options, OptionSpec<T> spec, Map<String, Object> map, String key) {
        if (options.has(spec)) {
            T value = options.valueOf(spec);
            map.put(key, value);
        }
    }

	// joptsimple-specific helper method
    public static <T> void putOption(OptionSet options, OptionSpec<T> spec, Map<String, Object> map, String key) {
        T value = options.valueOf(spec);
        map.put(key, value);
    }


Example usage

ConsoleProducer.java
        Map<String, Object> readerProps() throws IOException {
            Map<String, Object> commandLineMap = new HashMap<>();
            putOption(options, topicOpt, commandLineMap, "topic");

            Map<String, Object> configMap = new HashMap<>();
            if (options.has(readerConfigOpt)) {
                configMap.putAll(propsToStringMap(loadProps(options.valueOf(readerConfigOpt))));
            }

            Map<String, Object> commandLineKeyValMap = new HashMap<>();
            if (options.has(readerPropertyOpt)) {
                commandLineKeyValMap.putAll(propsToStringMap(parseKeyValueArgs(options.valuesOf(readerPropertyOpt))));
            }
            return mergePropertiesWithPrecedence(commandLineMap, commandLineKeyValMap, configMap, null);
        }
        
		Map<String, Object> producerProps() throws IOException {
            // Prepare the map from command line arguments
            Map<String, Object>  commandLineMap = new HashMap<>();
            putIfOption(options, bootstrapServerOpt, commandLineMap, BOOTSTRAP_SERVERS_CONFIG);
            commandLineMap.put(COMPRESSION_TYPE_CONFIG, compressionCodec());
            putIfOption(options, sendTimeoutOpt, commandLineMap, LINGER_MS_CONFIG);
            putIfOption(options, requestRequiredAcksOpt, commandLineMap, ACKS_CONFIG);
            putIfOption(options, requestTimeoutMsOpt, commandLineMap, REQUEST_TIMEOUT_MS_CONFIG);
            putIfOption(options, messageSendMaxRetriesOpt, commandLineMap, RETRIES_CONFIG);
            putIfOption(options, retryBackoffMsOpt, commandLineMap, RETRY_BACKOFF_MS_CONFIG);
            putIfOption(options, socketBufferSizeOpt, commandLineMap, SEND_BUFFER_CONFIG);
            putIfOption(options, maxMemoryBytesOpt, commandLineMap, BUFFER_MEMORY_CONFIG);
            // We currently have 2 options to set the batch.size value. We'll deprecate/remove one of them in KIP-717.
            putIfOption(options, batchSizeOpt, commandLineMap, BATCH_SIZE_CONFIG);
            putIfOption(options, maxPartitionMemoryBytesOpt, commandLineMap, BATCH_SIZE_CONFIG);
            putIfOption(options, metadataExpiryMsOpt, commandLineMap, METADATA_MAX_AGE_CONFIG);
            putIfOption(options, maxBlockMsOpt, commandLineMap, MAX_BLOCK_MS_CONFIG);

            // Properties passed as key=value pairs via command line (Command line properties)
            Map<String, Object> commandLineKeyValMap = new HashMap<>();
            if (options.has(commandPropertyOpt)) {
                commandLineKeyValMap.putAll(Utils.propsToStringMap(
                        parseKeyValueArgs(options.valuesOf(commandPropertyOpt))
                ));
            }
            // Configuration files
            Map<String, Object> configMap = new HashMap<>();
            if (options.has(commandConfigOpt)) {
                configMap.putAll(Utils.propsToStringMap(
                        Utils.loadProps(options.valueOf(commandConfigOpt))
                ));
            }

            // Default values set by tool scripts and default values of command line arguments
            Map<String, Object> toolDefaultMap = new HashMap<>();
            toolDefaultMap.put(CLIENT_ID_CONFIG, "console-producer");
            toolDefaultMap.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
            toolDefaultMap.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
            // Since all the options below have default values, we don't need to check for null
            putOption(options, sendTimeoutOpt, toolDefaultMap, LINGER_MS_CONFIG);
            putOption(options, requestRequiredAcksOpt, toolDefaultMap, ACKS_CONFIG);
            putOption(options, requestTimeoutMsOpt, toolDefaultMap, REQUEST_TIMEOUT_MS_CONFIG);
            putOption(options, messageSendMaxRetriesOpt, toolDefaultMap, RETRIES_CONFIG);
            putOption(options, retryBackoffMsOpt, toolDefaultMap, RETRY_BACKOFF_MS_CONFIG);
            putOption(options, socketBufferSizeOpt, toolDefaultMap, SEND_BUFFER_CONFIG);
            putOption(options, maxMemoryBytesOpt, toolDefaultMap, BUFFER_MEMORY_CONFIG);
            putOption(options, batchSizeOpt, toolDefaultMap, BATCH_SIZE_CONFIG);
            putOption(options, maxPartitionMemoryBytesOpt, toolDefaultMap, BATCH_SIZE_CONFIG);
            putOption(options, metadataExpiryMsOpt, toolDefaultMap, METADATA_MAX_AGE_CONFIG);
            putOption(options, maxBlockMsOpt, toolDefaultMap, MAX_BLOCK_MS_CONFIG);

			return mergePropertiesWithPrecedence(commandLineMap, commandLineKeyValMap, configMap, toolDefaultMap);
        }


Required Updates to Existing Tools

The following table lists the tools that need to be updated based on the proposed changes.

 All of the tools listed below should use the helper methods to ensure they honor the proposed precedence.

ToolsNeed New "modern" Option and List properties that do not follow the proposed precedenceValidating Required Arguments from Multiple SourcesPrevious Jira/Email Discussion

Note

kafka-acls.sh

--bootstrap-server

--bootstrap-controller



kafka-broker-api-versions.sh


--bootstrap-server


kafka-client-metrics.sh

--bootstrap-server



kafka-cluster.sh

--bootstrap-server

--bootstrap-controller



kafka-configs.sh

--bootstrap-server

--bootstrap-controller



kafka-console-consumer.sh

For formatter:

KEY_DESERIALIZER_CLASS_CONFIG

VALUE_DESERIALIZER_CLASS_CONFIG

--bootstrap-server

--from-beginning (AUTO_OFFSET_RESET_CONFIG)

--group (GROUP_ID_CONFIG) is validated but does not follow the proposed validation flow



kafka-console-producer.sh

KEY_SERIALIZER_CLASS_CONFIG

VALUE_SERIALIZER_CLASS_CONFIG

COMPRESSION_TYPE_CONFIG

--bootstrap-server
KAFKA-2526 - Getting issue details... STATUS

kafka-console-share-consumer.sh

For formatter:

KEY_DESERIALIZER_CLASS_CONFIG

VALUE_DESERIALIZER_CLASS_CONFIG

--bootstrap-server

--group (GROUP_ID_CONFIG) is validated but does not follow the proposed validation flow



kafka-consumer-groups.sh
--bootstrap-server

kafka-consumer-perf-test.sh

GROUP_ID_CONFIG

RECEIVE_BUFFER_CONFIG

MAX_PARTITION_FETCH_BYTES_CONFIG

AUTO_OFFSET_RESET_CONFIG

KEY_DESERIALIZER_CLASS_CONFIG

VALUE_DESERIALIZER_CLASS_CONFIG

CHECK_CRCS_CONFIG

--bootstrap-server

KAFKA-10043 - Getting issue details... STATUS


kafka-delegation-tokens.sh
--bootstrap-server



kafka-delete-records.sh
--bootstrap-server



kafka-e2e-latency.sh

For consumer:

GROUP_ID_CONFIG

ENABLE_AUTO_COMMIT_CONFIG

AUTO_OFFSET_RESET_CONFIG

KEY_DESERIALIZER_CLASS_CONFIG

VALUE_DESERIALIZER_CLASS_CONFIG

FETCH_MAX_WAIT_MS_CONFIG

For producer:

LINGER_MS_CONFIG

MAX_BLOCK_MS_CONFIG

ACKS_CONFIG

KEY_SERIALIZER_CLASS_CONFIG

VALUE_SERIALIZER_CLASS_CONFIG

--bootstrap-server

--producer-acks (ACKS_CONFIG)



kafka-features.sh

--bootstrap-server

--bootstrap-controller



kafka-get-offsets.sh

CLIENT_ID_CONFIG

--bootstrap-server



kafka-groups.sh


--bootstrap-server



kafka-leader-election.sh


--bootstrap-server



kafka-log-dirs.sh


--bootstrap-server



kafka-metadata-quorum.sh


--bootstrap-server

--bootstrap-controller



kafka-producer-perf-test.sh

KEY_SERIALIZER_CLASS_CONFIG

VALUE_SERIALIZER_CLASS_CONFIG




kafka-reassign-partitions.sh


--bootstrap-server

--bootstrap-controller



kafka-share-consumer-perf-test.sh

GROUP_ID_CONFIG

RECEIVE_BUFFER_CONFIG

MAX_PARTITION_FETCH_BYTES_CONFIG

AUTO_OFFSET_RESET_CONFIG

KEY_DESERIALIZER_CLASS_CONFIG

VALUE_DESERIALIZER_CLASS_CONFIG

CHECK_CRCS_CONFIG

--bootstrap-server



kafka-share-groups.sh


--bootstrap-server



kafka-streams-application-reset.sh


--bootstrap-server


This tool script is the only one with a default bootstrap-server; consider aligning it with the others.

kafka-streams-groups.sh


--bootstrap-server



kafka-topics.sh


--bootstrap-server



kafka-transactions.sh


--bootstrap-server



kafka-verifiable-consumer.sh

GROUP_PROTOCOL_CONFIG

GROUP_REMOTE_ASSIGNOR_CONFIG

PARTITION_ASSIGNMENT_STRATEGY_CONFIG

--group-id (GROUP_ID_CONFIG)

ENABLE_AUTO_COMMIT_CONFIG

AUTO_OFFSET_RESET_CONFIG

--bootstrap-server

--group-id (GROUP_ID_CONFIG)



kafka-verifiable-producer.sh

KEY_SERIALIZER_CLASS_CONFIG

VALUE_SERIALIZER_CLASS_CONFIG

ACKS_CONFIG

RETRIES_CONFIG

--bootstrap-server



kafka-verifiable-share-consumer.sh


--bootstrap-server

--group-id (GROUP_ID_CONFIG)




Compatibility, Deprecation, and Migration Plan

Impact: Existing users who have configured their settings based on the current tool implementations, rather than the proposed precedence, may be affected.

Deprecation: Adjusting configuration precedence might break existing users. For this reason, we have an option added to each affected tool script to enable or disable this change. The plan is to always use the proposed precedence in the next major release, Kafka 5.0. Concurrently, the option will be deprecated in Kafka 5.0 and subsequently removed in Kafka 6.0.

Test Plan

  • Unit tests for helper methods.
  • Unit tests will be added to ensure that the properties are honored according to the precedence order as proposed.
  • Unit tests for validating required arguments from multiple sources.

Rejected Alternatives

1. Deprecate the "modern" option during the proposed rollout and remove it in the next major release, Kafka 5.0

Reason for Rejection: Users who have enabled the modern option in scripts will face fatal errors once Kafka 5.0 is released.

2. Preferring Configuration Files Over Command Line Arguments

Reason for Rejection: Intuitively, users are more likely to expect properties set via command line arguments to take precedence over those defined in configuration files.

3. Allow Each Tool to Have Its Own Configuration Precedence

Reason for Rejection: While this would provide more flexibility for Kafka developers, it would introduce undesirable complexity to the configuration logic, be confusing to users, and increase the potential for bugs.




  • No labels