...
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion Rejected [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
The options accepted by kafka-configs.sh
command will change:
--zookeeper
will be deprecated, which means it will display a warning message saying that removed, it's ignored.--bootstrap-server
option will be added: it has a parameter which is an endpoint to a broker (or a comma separated list of brokers)--command-config
option will be added: a file path to an admin client configuration properties file
Protocol Changes
KIP-133 introduced the describe and alter admin protocols and KIP-140 a wire format representation for ResourceType. We will modify these to accommodate the new requirements.
Wire Format Types
ResourceType
0: Unknown
1: Any
2: Topic
3: Group
4: Broker
5: User (new)
6: Client (new)
QuotaType (new)
0: ProducerByteRate
1: ConsumerByteRate
2: RequestPercentage
QuotaSource (new)
0: ClientInUser
1: DefaultClientInUser
2: User
3: ClientInDefaultUser
4: DefaultClientInDefaultUser
5: DefaultUser
6: Client
7: DefaultClient
AdminOperation (new)
0: Add
1: Set
2: Delete
Describe Quotas
The justification for a new protocol is that a quota is quite different from a broker or topic config because a quota can sometimes be identified a simple user, client or even a (user,client) tuple while a topic or a broker config can be identified only by the topic's name or the broker's ID. Moreover quotas have their own well defined types.
Code Block | ||||
---|---|---|---|---|
| ||||
DescribeQuotas Request (Version: 0) => [resource]
resource => [quota_resource] [quota_type]
quota_resource => type name
type => INT8
name => STRING
quota_type => INT8 |
Request semantics:
- Can be sent to any broker
- If the
name
is empty it means that listing the default quota is asked. Responses will be returned the same way for defaults. - If the
quota_type
array is empty, all quotas are returned. Otherwise, quotas with the provided types are returned. - Authorization: "DescribeQuotas" can only be interpreted on the "Cluster" resource and represented by the DescribeConfigs ACL due to the similarity in use cases. Unauthorized requests will receive an appropriate AuthorizationFailed error code.
- functionality won't be available. The design decision behind is that the ConfigCommand tool will be rewritten in the tools module which doesn't depend on the core module. This makes it hard to provide backward compatibility with the current ConfigCommand.
--bootstrap-server
was added in
and will be used here.Jira server ASF JIRA columns key,summary,type,created,updated,due,assignee,reporter,priority,status,resolution serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-6494 --adminclient.config
option will be added and should be used similarly to other tools, such as --producer.config in the console-producer. This parses a config file and initializes the admin client used internally.--adminclient-property
option will be added. It is a key=value list that will be parsed by the command. It initializes the internal admin client.
A new tool, called scram-credentials.sh will be added. The need for this broker is when people use zookeeper as a credentials store for SCRAM (and currently users have no other option), then direct interaction with zookeeper is required to set up the initial credentials with inter-broker communication. The functionality of the tool will cover the following:
- Add, remove and list SCRAM credentials directly with zookeeper
- Will continue to use the
--zookeeper
option to specify the zookeeper host - Works similarly to the old config command. Examples:
- Add new credentials:
bin/scram-credentials.sh --zookeeper localhost:2181 --add 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' --username alice
- Describe credentials:
bin/scram-credentials.sh --zookeeper localhost:2181 --describe --username alice
- Remove credentials:
bin/scram-credentials.sh --zookeeper localhost:2181 --delete 'SCRAM-SHA-512' --username alice
- Add new credentials:
Protocol Changes
KIP-133 introduced the describe and alter admin protocols and KIP-140 a wire format representation for ResourceType. We will modify these to accommodate the new requirements.
Wire Format Types
ResourceType
0: Unknown
1: Any
2: Topic
3: Group
4: Broker
5: User (new)
6: Client (new)
QuotaType (new)
0: ProducerByteRate
1: ConsumerByteRate
2: RequestPercentage
QuotaSource (new)
0: ClientInUser
1: DefaultClientInUser
2: User
3: ClientInDefaultUser
4: DefaultClientInDefaultUser
5: DefaultUser
6: Client
7: DefaultClient
Field Types
Double
A new type needs to be added to transfer quota values. Since the protocol classes in Kafka already uses ByteBuffers it is logical to use their functionality for serializing doubles. The serialization is basically a representation of the specified floating-point value according to the IEEE 754 floating-point "double format" bit layout. The ByteBuffer serializer writes eight bytes containing the given double value, in Big Endian byte order, into this buffer at the current position, and then increments the position by eight.
The implementation will be defined in org.apache.kafka.common.protocol.types
with the other protocol types and it will have no default value much like the other types available in the protocol.
Describe Quotas
The justification for a new protocol is that a quota is quite different from a broker or topic config because a quota can sometimes be identified a simple user, client or even a (user,client) tuple while a topic or a broker config can be identified only by the topic's name or the broker's ID. Moreover quotas have their own well defined types.
Code Block | ||||
---|---|---|---|---|
| ||||
DescribeQuotas Request (Version: 0) => [resource]
resource => [quota_resource] [quota_type]
quota_resource => type name
type => INT8
name => STRING
quota_type => INT8 |
Request semantics:
- Can be sent to any broker
- If the
name
is empty it means that listing the default quota is asked. Responses will be returned the same way for defaults. - If the
quota_type
array is empty, all quotas are returned. Otherwise, quotas with the provided types are returned. - Authorization: "DescribeQuotas" can only be interpreted on the "Cluster" resource and represented by the DescribeConfigs ACL due to the similarity in use cases. Unauthorized requests will receive an appropriate AuthorizationFailed error code.
Code Block | ||||
---|---|---|---|---|
| ||||
DescribeQuotas Response (Version: 0) => throttle_time_ms [resource]
throttle_time_ms => INT32
resource => [quota_resource] [quota]
quota_resource => type name
type => INT8
name => STRING
quota_collection => error_code error_message [quota_entry]
error_code => INT16
error_message => NULLABLE_STRING
quota_entry => quota_type quota_value quota_source
quota_type => INT8
quota_value => DOUBLE
quota_source => INT8 |
Alter Quotas
Code Block | ||||
---|---|---|---|---|
| ||||
AlterQuota Request (Version: 0) => validate_only [resource]
validate_only => BOOLEAN
resource => [quota_resource] [quota]
quota_resource => type name
type => INT8
name => STRING
quota => quota_type quota_value
quota_type => INT8
quota_value => DOUBLE |
Request Semantics
- Can be sent to any broker
- If
name
is empty it means that altering a default quota is asked. - Authorization: "AlterQuotas" can only be interpreted on the "Cluster" resource and represented by the AlterConfigs ACL due to the similarity in use cases. Unauthorized requests will receive an appropriate AuthorizationFailed error code.
- For tools that allow users to alter quota configs, a validation/dry-run mode where validation errors are reported but no creation is attempted is available via the
validate_only
parameter. - The AlterQuotas protocol has an incremental semantics. By this we mean that the request will update only those quotas which are sent in the request.
- Removing quotas will be done by sending a NaN as the value.
Code Block | ||||
---|---|---|---|---|
| ||||
AlterQuotas Response (Version: 0) => throttle_time_ms [resource]
throttle_time_ms => INT32
resource => [quota_resource] [quota]
quota_resource => type name
type => INT8
name => STRING
quota => error_code error_message quota_type
error_code => INT16
error_message => NULLABLE_STRING
quota_type => INT8 |
AlterConfigs
This request needs some change as currently the --add-config operation of ConfigCommand would do incremental operations in Zookeeper but the AlterConfigs protocol sets the whole properties object. The purpose of this change to add an boolean parameter to the request so that it can specify the behavior (incremental or set) which needs to be executed.
Code Block | ||||
---|---|---|---|---|
| ||||
AlterConfigs Request (Version: 1) => [resources] validate_only incremental_update
validate_only => BOOLEAN
incremental_update => BOOLEAN // new addition
resources => resource_type resource_name [configs]
resource | ||||
Code Block | ||||
| ||||
DescribeQuotas Response (Version: 0) => throttle_time_ms [resource] throttle_time_ms => INT32 resource => [quota_resource] [quota] quota_resource => type name type => INT8 name => STRING quota_collection => error_code error_message [quota_entry] error_code => INT16 error_message => NULLABLE_STRING quota_entry => quota_type quota_value quota_source quota_type => INT8 quota_valueresource_name => DOUBLESTRING configs => config_name quota_source => INT8 |
Alter Quotas
Code Block | ||||
---|---|---|---|---|
| ||||
AlterQuota Request (Version: 0)config_value config_name => validate_only [resource]STRING validate_only => BOOLEAN resourceconfig_value => [quota_resource] [quota] quota_resource => type nameSTRING |
Request Semantics:
- The default value of
incremental_update
is false. That means that the request will wipe the node's data and sets what is sent in the request. - Setting the
incremental_update
flag to true makes sure that existing configs are not deleted. - Deleting a config in incremental mode is done by sending an empty string as value.
- Other existing semantics aren't changed.
AdminClient APIs
Code Block | ||||
---|---|---|---|---|
| ||||
public static class Quota { public type => INT8 name => STRING quota => quota_type quota_value quota_operation QuotaType type(); public quota_type => INT8 double value(); public QuotaSource quota_value => DOUBLEsource(); } public enum QuotaType { PRODUCER_BYTE_RATE((byte) quota_operation => INT8 |
Request Semantics
- Can be sent to any broker
- If
name
is empty it means that altering a default quota is asked. - Authorization: "AlterQuotas" can only be interpreted on the "Cluster" resource and represented by the AlterConfigs ACL due to the similarity in use cases. Unauthorized requests will receive an appropriate AuthorizationFailed error code.
- For tools that allow users to alter quota configs, a validation/dry-run mode where validation errors are reported but no creation is attempted is available via the
validate_only
parameter.
Code Block | ||||
---|---|---|---|---|
| ||||
AlterQuotas Response (Version: 0) => throttle_time_ms [resource]
throttle_time_ms => INT32
resource => [quota_resource] [quota]
quota_resource => type name
type => INT8
name => STRING
quota => error_code error_message quota_type
error_code => INT16
error_message => NULLABLE_STRING
quota_type => INT8 |
DescribeConfigs and AlterConfigs (SCRAM)
To enable describing and altering SCRAM credentials we will use the DescribeConfigs and AlterConfigs protocols. There are no changes in the protocol's structure but we will allow the USER resource type to be passed in the protocol. When this happens, the server will know that SCRAM configs are asked and will send them in the response. In case of AlterConfigs if a USER resource type is passed it will validate if there are only SCRAM credentials are changed. If not, then will fail with InvalidRequestException
.
AlterConfigs
Code Block | ||||
---|---|---|---|---|
| ||||
AlterConfigs Request (Version: 1) => [resources] validate_only
validate_only => BOOLEAN
resources => resource_type resource_name [configs]
resource_type => INT8
resource_name => STRING
configs => config_name config_value config_operation
config_name => STRING
config_value => STRING
config_operation => INT8 <= new addition |
AdminClient APIs
Code Block | ||||
---|---|---|---|---|
| ||||
public static class Quota0, "producer_byte_rate"), CONSUMER_BYTE_RATE((byte) 1, "consumer_byte_rate"), REQUEST_PERCENTAGE((byte) 2, "request_percentage"); QuotaType(byte id, String name); public byte id(); public String quotaName(); } public enum QuotaSource { CLIENT_OF_USER((byte) 0, "Client of user"), DEFAULT_CLIENT_OF_USER((byte) 1, "Default client of user"), USER((byte) 2, "User"), CLIENT_OF_DEFAULT_USER((byte) 3, "Client of default user"), DEFAULT_CLIENT_OF_DEFAULT_USER((byte) 4, "Default client of default user"), DEFAULT_USER((byte) 5, "Default user"), CLIENT((byte) 6, "Client"), DEFAULT_CLIENT((byte) 7, "Default client"); QuotaSource(byte id, String description); public byte id(); public String description(); } /** * Makes sure that the list of resources that is used as key in a hashmap is immutable and has a fixed implementation for the hashCode. */ public class ConfigResourceList { public List<ConfigResource> getResourceList(); public class AdminClient { public DescribeQuotasResult describeQuotas(Map<ConfigResourceList, Collection<QuotaType>>, DescribeQuotasOptions options); public AlterQuotasResult alterQuotas(Map<ConfigResourceList, Collection<Quota>> configs, AlterQuotasOptions options); } public class DescribeQuotasOptions extends AbstractOptions<DescribeQuotasOptions> { public DescribeQuotasOptions timeoutMs(Integer timeout); } public class DescribeQuotasResult { public Map<List<Resource>, KafkaFuture<Collection<Quota>>> values(); } public class AlterQuotasOptions extends AbstractOptions<AlterQuotasOptions> { public AlterQuotasOptions timeoutMs(Integer timeout); public AlterQuotasOptions validateOnly(boolean validateOnly); } public class AlterQuotasResult { public Map<List<Resource>, KafkaFuture<Void>> results(); } public class AlterConfigsOptions extends AbstractOptions<AlterConfigsOptions> { public QuotaTypeAlterConfigsOptions typetimeoutMs(Integer timeoutMs); public doubleAlterConfigsOptions valuevalidateOnly(boolean validateOnly); public QuotaSourceboolean sourceshouldValidateOnly(); } public enum QuotaType { public AlterConfigsOptions PRODUCER_BYTE_RATE((byte) 0, "producer_byte_rate"), incrementalUpdate(boolean incrementalUpdate); // new CONSUMER_BYTE_RATE((byte) 1, "consumer_byte_rate"), REQUEST_PERCENTAGE((byte) 2, "request_percentage"); QuotaType(byte id, String name);public boolean shouldUpdateIncrementally(); // new } |
Request API
Code Block | ||||
---|---|---|---|---|
| ||||
public class QuotaCollection { public QuotaCollection(ApiError error, Collection<Quota> entries); public QuotaCollection(Collection<Quota> entries); public byteApiError iderror(); public StringCollection<Quota> quotaNameentries(); } public class DescribeQuotasRequest enumextends AdminOperationAbstractRequest { public static Schema[] ADDschemaVersions((byte) 0), SET((byte) 1), DELETE((byte) 2); QuotaType(byte id); public byte id(); } public enum QuotaSource { CLIENT_OF_USER((byte) 0, "Client of user"), DEFAULT_CLIENT_OF_USER((byte) 1, "Default client of user"), USER((byte) 2, "User"), CLIENT_OF_DEFAULT_USER((byte) 3, "Client of default user"), DEFAULT_CLIENT_OF_DEFAULT_USER((byte) 4, "Default client of default user"), DEFAULT_USER((byte) 5, "Default user"), CLIENT((byte) 6, "Client"), DEFAULT_CLIENT((byte) 7, "Default client"); QuotaSource(byte id, String description); public byte id(); public String description); public static DescribeQuotasRequest parse(ByteBuffer buffer, short version); public static class Builder extends AbstractRequest.Builder { public Builder(Map<List<Resource>, Collection<QuotaType>> quotaSettings); public DescribeQuotasRequest build(short version); } public DescribeQuotasRequest(short version, Map<List<Resource>, Collection<QuotaType>> quotaSettings); public DescribeQuotasRequest(Struct struct, short version); public Map<List<Resource>, Collection<QuotaType>> quotaTypes(); } public class DescribeQuotasResponse extends AbstractResponse { public static Schema[] schemaVersions(); public DescribeQuotasResponse(int throttleTimeMs, Map<ConfigResourceList, KafkaFuture<Collection<Quota>>>); public DescribeQuotasResponse(Struct struct); public Map<List<Resource>, QuotaCollection> quotas(); } public class AlterQuotasRequest extends AbstractRequest { public static Schema[] schemaVersions(); public static class Builder extends AbstractRequest.Builder { public Builder(Map<List<Resource>, QuotaCollection> quotaSettings); public DescribeQuotasRequest build(short version); } public AlterQuotasRequest(short version, Map<List<Resource>, QuotaCollection> quotas, boolean validateOnly); public AlterQuotasRequest(Struct struct, short version); public Map<List<Resource>, QuotaCollection> quotas(); } public class AdminClientAlterQuotasResponse extends AbstractResponse { public static Schema[] schemaVersions(); public DescribeQuotasResult describeQuotas(Map<List<ConfigResource>, Collection<QuotaType>>, DescribeQuotasOptions options); public AlterQuotasResult alterQuotas(Map<List<ConfigResource>, Collection<Quota>> configs, AlterQuotasOptions options); } public class DescribeQuotasOptions { AlterQuotasRequest(short version, Map<List<Resource>, ApiError> quotas, boolean validateOnly); public AlterQuotasRequest(Struct struct, short version); public Map<List<Resource>, ApiError> errors(); public DescribeQuotasOptionsint timeoutMsthrottleTimeMs(Integer timeout); } public class DescribeQuotasResult { |
New Command Line Interface
The kafka-config.sh
command line interface will change a little bit in terms of help message and response format as we will use argparse4j for parsing arguments.
Help Message
No Format |
---|
usage: config-command [-h] --entity-type {topics,clients,users,brokers} public Map<List<Resource>, KafkaFuture<Collection<Quota>>> values(); } public class AlterQuotasOptions { public AlterQuotasOptions timeoutMs(Integer timeout); public AlterQuotasOptions validateOnly(boolean validateOnly); } public class AlterQuotasResult { [--force FORCE] [--add-config ADDCONFIG] public Map<List<Resource>, KafkaFuture<Void>> results(); } |
Request API
Code Block | ||||
---|---|---|---|---|
| ||||
public class QuotaCollection { public QuotaCollection(ApiError error, Collection<Quota> entries); public QuotaCollection(Collection<Quota> entries); [--delete-config DELETECONFIG] public ApiError error(); public Collection<Quota> entries(); } public class DescribeQuotasRequest extends AbstractRequest { public static Schema[] schemaVersions(); public static DescribeQuotasRequest parse(ByteBuffer buffer, short version); public static class Builder extends AbstractRequest.Builder { public Builder(Map<List<Resource>, Collection<QuotaType>> quotaSettings); public DescribeQuotasRequest build(short version); } public DescribeQuotasRequest(short version, Map<List<Resource>, Collection<QuotaType>> quotaSettings); public DescribeQuotasRequest(Struct struct, short version); public Map<List<Resource>, Collection<QuotaType>> quotaTypes(); } public class DescribeQuotasResponse extends AbstractResponse { public static Schema[] schemaVersions(); public DescribeQuotasResponse(int throttleTimeMs, Map<List<ConfigResource>, KafkaFuture<Collection<Quota>>>); public DescribeQuotasResponse(Struct struct); public Map<List<Resource>, QuotaCollection> quotas(); } public class AlterQuotasRequest extends AbstractRequest { public static Schema[] schemaVersions(); public static class Builder extends AbstractRequest.Builder { public Builder(Map<List<Resource>, QuotaCollection> quotaSettings); public DescribeQuotasRequest build(short version); } public AlterQuotasRequest(short version, Map<List<Resource>, QuotaCollection> quotas, boolean validateOnly); public AlterQuotasRequest(Struct struct, short version); public Map<List<Resource>, QuotaCollection> quotas(); } public class AlterQuotasResponse extends AbstractResponse { public static Schema[] schemaVersions(); public AlterQuotasRequest(short version, Map<List<Resource>, ApiError> quotas, boolean validateOnly); public AlterQuotasRequest(Struct struct, short version); public Map<List<Resource>, ApiError> errors(); public int throttleTimeMs(); } |
New Command Line Interface
The kafka-config.sh command line interface will change a little bit in terms of help message and response format as we will use argparse4j for parsing arguments.
Help Message
No Format |
---|
usage: config-command [-h]--entity-name ENTITYNAME | --entity-default) (--describe | --alter) (--bootstrap-server BOOTSTRAPSERVERS | --adminclient.config CONFIGPROPERTIES | --adminclient-property ADMINCLIENTPROPERTY) Change configs for topics, clients, users, brokers dynamically. optional arguments: -h, --help show this help message and exit --entity-type {topics,clients,users,brokers} [--force FORCE] [--add-config ADDCONFIG] REQUIRED: the type of [--delete-config DELETECONFIG] entity (--entity-name ENTITYNAME | --entity-default (topics/clients/users/brokers) --force FORCE Suppresses console prompts (--describe | --alter) --add-config ADDCONFIG (--bootstrap-server BOOTSTRAPSERVERS | Key Value pairs of configs to --adminclientadd.config CONFIGPROPERTIES |Square --adminclient-property ADMINCLIENTPROPERTY) Change configs forbrackets topics, clients,can users, brokersbe dynamically. optional arguments: -h, --help used to group values which show this help message and exit --entity-type {topics,clients,users,brokers} contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. --delete-config DELETECONFIG REQUIRED: the type Config keys to remove ofin the following entityform: 'k1, (topics/clients/users/brokers)k2'. --forceYou FORCEcan specify only Suppresses console prompts --add-config ADDCONFIG Key Value pairs of configs to add. Square one in --entity-name and --entity-default --entity-name ENTITYNAME Name of entity (client id/user principal name) --entity-default Default entity name bracketsfor canclients/users (applies beto used to group values which corresponding entity type in command line) You can specify only contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'.one in --alter, --describe --delete-config DELETECONFIG describe List configs for the given entity. (default: Config keys to remove in the following form: 'k1, false) --alter k2'. You can specifyAlter onlythe oneconfiguration infor --entity-name andthe --entity-. (default: --entity-name ENTITYNAME false) REQUIRED. You Namecan ofspecify entityonly (client id/user principal name)one in --bootstrap-servers, --adminclient.config --entity-defaultbootstrap-server BOOTSTRAPSERVER Default entity name for clients/users (applies to The broker list string in the form corresponding entity type in command line) You can specify only one in --alter, --describe --describe HOST1:PORT1,HOST2:PORT2. --command-config COMMANDCONFIG List configs for the given entity. (default: The config properties file for the false) --alter Admin Client. Process finished with exit code Alter the configuration for the entity. (default: 0 |
Output Format
No Format |
---|
false) REQUIRED. You can specify only one in --bootstrap-servers, --adminclient.config --bootstrap-server BOOTSTRAPSERVER CONFIGS FOR TOPIC topicA The broker list string in the form Name Value HOST1:PORT1,HOST2:PORT2. --command-config COMMANDCONFIG Sensitive Read-only The config properties file for theSource compression.type = producer Admin Client. Process finished with exit code 0 |
Output Format
No Format |
---|
false false Default config CONFIGS FOR TOPIC topicA message.format.version = 1.0-IV0 Name Value false false Default config Sensitive Read-only Default file.delete.delay.ms = 6000 compression.type = producer false false Dynamic topic config leader.replication.throttled.replicas = false false true message.format.version = 1.0-IV0 false false Default config false false true max.message.bytes = 1000012 file.delete.delay.ms = 60000 false false Default config false false true leadermin.replicationcompaction.throttledlag.replicasms = 0 false false true Default config maxmessage.messagetimestamp.bytestype = CreateTime 1000012 false false false Default config true min.compactioninsync.lag.msreplicas = 0 1 false false false Default falseconfig true messagesegment.timestampjitter.typems = CreateTime 0 false false true Default config min.insync.replicas = 1 preallocate = false false false true Default config segmentindex.jitterinterval.msbytes = 04096 false false false false true Default config min.cleanable.dirty.ratio = 0.5 preallocate = false false false false falseDefault config true unclean.leader.election.enable = false index.interval.bytes = 4096 false false falseDefault config false true min.cleanable.dirty.ratioretention.bytes = 10 0.5 false false Dynamic topic trueconfig unclean.leader.election.enable = false delete.retention.ms = 86400000 false false false true false Default config retention.bytes = 10 cleanup.policy = delete false false false false false Default config delete.retention.ms = 86400000 flush.ms = 9223372036854775807 false false true false false Default config cleanup.policyfollower.replication.throttled.replicas = delete false false true false Default config flush.ms = 9223372036854775807 segment.bytes = 1073741824 false false true follower.replication.throttled.replicas = false false Default config false false retention.ms = 604800000 true segment.bytes = 1073741824 false false Default config false false true segment.ms = 604800000 retention.ms = 604800000 false false Default config false message.timestamp.difference.max.ms = 9223372036854775807 false true false false Default config segment.ms = 604800000 flush.messages = 9223372036854775807 false false true false message.timestamp.difference.max.ms = 9223372036854775807 false Default config false false true segment.index.bytes = 10485760 flush.messages = 9223372036854775807 false false Default falseconfig false true producer.byte.rate = 1000 segment.index.bytes = 10485760 false false false Default trueuser |
As seen above, the describe format becomes more organized and it will also return default properties (as the protocol currently supports that). In case of alter we will also do an extra describe after executing the alter and print the most fresh state.
Compatibility, Deprecation, And Migration Plan
Compatibility
Firstly, behavior of the the --zookeeper command line parameter will change. After this change it will print a warning message saying its ignored. Therefore option will be removed from kafka-configs.sh and the backing code will be replaced. Therefore every user will need to change --zookeeper
to --bootstrap-servers
, --adminclient-property
or --adminclient.config
. --adminclient-property
or --adminclient.config
. SCRAM update will be done by the newly introduced scram-credentials.sh
tool. Other existing behavior will be kept.
Secondly, users as of this KIP would be able to describe all topics or brokers in one step but can't do it for clients and users. For those who have this use case will would still need to use the old command for a while (see below). The reason for this change is currently MetadataRequest provides enough information about topics and brokers so it's possible to describe all of them in one step but there's no such information about clients and users.
Finally, backward compatibilty (for instance a 12.1 0 client wants to admin a 1.0 server) will be impacted as some of the protocols are newly created and doesn't exist in old servers. In these cases users should continue using the scala version of the ConfigCommand by putting the core jar on their classpath and defining the USE_OLD_COMMAND=true
environment variable. This variable will set the main class to the old command in the config and invokes that. This way the environment variable ensures that users who aren't able to use the new command currently would need to make minimal changes in order to continue using it. of the protocols are newly created and doesn't exist in old servers. In these cases users should continue using the scala version of the ConfigCommand by putting the core jar on their classpath.
The old Alternatively though the command could be launched through kafka-run-class.sh like this:
...
From the compatibility point of view there might be a bigger impact as mentioned above. Since the command now uses the wire protocols (including some newly introduced ones) the backward compatibility will be impacted. That means that a user can't use a 12.1 0 client to administrate a 1.0 broker as in the older broker some of the wire protocols don't exist. This again should be acceptable most of the users as most of the admin commands require the core jar on the classpath which means that most of the time the commands are executed from an environment with the same version of the brokers. In the remaining cases users will have to change to use kafka-run-class or the USE_OLD_COMMAND
environment variableversion of the brokers. Therefore the old tool should still be available.
Deprecation
kafka.admin.ConfigCommand
will print a warning message saying it is deprecated and will be removed in version 2.0.To ease the migration for users who are stuck with this command, the USE_OLD_COMMAND
will be introduceda future version.
Special Migration Tools
There are no tools required.
...
The current --zookeeper
option will be disabled removed with this change as it has minimal impact on the current users.
Listing multiple users' and clients' quotas at once won't be possible. If this is required, users would need to use the old tool.
Test Plan
Most of the functionality can be covered with end-to-end tests. There will be unit tests also to verify the protocol and the broker side logic.
...