Status

Current state: Accepted

Discussion thread: here

Vote Thread: here

JIRA: KAFKA-19011 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The  `EndToEndLatency` tool is a crucial utility for measuring data transmission latency from producers to brokers and then to consumers. It provides essential performance metrics for all Kafka users. However, the current implementation suffers from several limitations:

  • Fragile Argument Parsing: The tool relies on fixed array indices to retrieve command-line arguments (e.g., args[0], args[1]). This approach is brittle, difficult to maintain, and prone to errors if the argument order changes or new arguments are introduced. For example:

String brokers = args[0];
String topic = args[1];
int numMessages = Integer.parseInt(args[2]);
String acks = args[3];
int messageSizeBytes = Integer.parseInt(args[4]);
Optional<String> propertiesFile = (args.length > 5 && !Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty();

  • Incomplete Test Coverage: The tool currently only supports testing messages without the ability to specify a message key or headers. This limitation prevents comprehensive latency testing for common Kafka use cases where keys are used for partitioning or ordering, and headers are used for metadata. To expand testing capabilities, we will introduce the --record-key-size argument for message keys, --num-headers for the count of headers,--record-header-key-size for message header keys, and --record-header-size for message header values.

This KIP aims to address these limitations.

Public Interfaces

CLI

The usage of the kafka-e2e-latency.sh  script will be updated.

kafka-e2e-latency.sh

Current version with fixed array indices:

kafka-e2e-latency.sh
USAGE: java org.apache.kafka.tools.EndToEndLatency broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file

New version with argument parser:

USAGE: java org.apache.kafka.tools.EndToEndLatency --bootstrap-server <bootstrap-server> --topic <topic> --num-records <num-records> --producer-acks <producer-acks> --record-size <record-size> [optional] --record-key-size <record-key-size> [optional] --num-headers <num-headers> [optional] --record-header-key-size <record-header-key-size> [optional] --record-header-size <record-header-size> [optional] --command-config <command-config>

The following new parser will be added:

  • --bootstrap-server
  • --topic
  • --num-records
  • --producer-acks
  • --record-size
  • --record-key-size [optional]
  • --num-headers [optional]
  • --record-header-key-size [optional]
  • --record-header-size [optional]
  • --command-config [optional]

Proposed Changes

The kafka-e2e-latency.sh  tool will be enhanced with the following improvements:

1. Adopt Argument Parser

The existing fixed-index argument parsing will be replaced with a robust argument parsing library (e.g., joptsimple). This will improve maintainability, readability, and future extensibility of the command-line interface.

2. Add --record-key-size Option

Introduce a new optional argument to specify the size (in bytes) of the message key, the default will be set to `0`. When provided, the tool will generate and attach a key of the specified size to each produced record. This option is NOT compatible with the legacy fixed-index parsing approach. 

3. Add --num-headers Option 

Add an optional argument to set how many headers are added to each message, the default will be set to `0`. This option is NOT compatible with the legacy fixed-index parsing approach.

4. Add --record-header-key-size Option

Enable users to specify the size (in bytes) of header keys per message, the default will be set to `0`. This option is NOT compatible with the legacy fixed-index parsing approach.

5. Add --record-header-size Option

Allow users to define the size (in bytes) of header values per message, the default will be set to `0`. This option is NOT compatible with the legacy fixed-index parsing approach.

6. Align Argument Naming With Kafka Conventions

Rename the following arguments for consistency with other Kafka tools

  • broker_listbootstrap-server

  • num_messagesnum-records

  • message_size_bytesrecord-size

  • properties_filecommand-config

Usage

order-independent arguments with an argument parser

USAGE: java org.apache.kafka.tools.EndToEndLatency --bootstrap-server <bootstrap-server> --topic <topic> --num-records <num-records> --producer-acks <producer-acks> --record-size <record-size> [optional] --record-key-size <record-key-size> [optional] --num-headers <num-headers> [optional] --record-header-key-size <record-header-key-size> [optional] --record-header-size <record-header-size> [optional] --command-config <command-config>

e.g.

/bin/kafka-e2e-latency.sh --bootstrap-server 172.20.10.3:10686 --topic test-topic --num-records 1000 --producer-acks 1 --record-size 512 --record-key-size 64 --num-headers 5 --record-header-key-size 64 --record-header-size 128

record-key-size (64) : The size of each message key in bytes.

num-headers(5) : The count of headers per message.

record-header-key-size (64) : The size of header keys per message in bytes.

record-header-size (128) : The size of header values per message in bytes.

Compatibility, Deprecation, and Migration Plan

This change is designed to be fully backward compatible:

  • The newly introduced options, `--record-key-size`,`--record-header-key-size` and `--record-header-size`, are optional.
  • Existing users who do not specify these new options will experience no change in behavior; the tool will function identically to its current state, testing messages without keys or headers.
  • The transition from positional arguments to named options will be handled internally by the argument parser, providing a more user-friendly and less error-prone interface for new and existing users.
  • End-to-End Testing Impact: The existing e2e testing (EndToEndLatencyService wrapper, benchmark_test..) will continue to work without any changes, since the tool maintains full backward compatibility with the original command-line format.

The previous syntax using positional arguments is deprecated and will be removed from the parser in AK 5.0. A deprecation warning message will be displayed if positional arguments are used to encourage users to move to the new syntax.

Test Plan

New and updated test cases will be added to `EndToEndLatencyTest.java` to ensure the correct functionality and compatibility of the changes:

  • Key and Header Testing: Test cases will be added to validate the correct behavior when `--record-key-size`,`--num-headers`,`--record-header-key-size` and  `--record-header-size` are specified, including scenarios with various size values and combinations.
  • Argument Parser Validation: Comprehensive tests will be implemented to ensure the updated argument parser correctly handles all existing and new options, including default values, error conditions (e.g., missing required arguments), and proper parsing of all data types.
  • Backward Compatibility: Tests will confirm that the tool continues to function correctly when `--record-key-size`,`--num-headers`,`--record-header-key-size` and `--record-header-size` are omitted, mimicking the current behavior.

Rejected Alternatives

N/A

  • No labels