Child pages
  • KIP-248 - Create New ConfigCommand That Uses The New AdminClient

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • 0: ClientInUser
  • 1: DefaultClientInUser
  • 2: User
  • 3: ClientInDefaultUser
  • 4: DefaultClientInDefaultUser
  • 5: DefaultUser
  • 6: Client
  • 7: DefaultClient

AdminOperation (new)

  • 0: AddUnknown
  • 1: SetAdd
  • 2: Set
  • 3: Delete

Describe Quotas

The justification for a new protocol is that a quota is quite different from a broker or topic config because a quota can sometimes be identified a simple user, client or even a (user,client) tuple while a topic or a broker config can be identified only by the topic's name or the broker's ID. Moreover quotas have their own well defined types.

...

  1. Can be sent to any broker
  2. If name is empty it means that altering a default quota is asked.
  3. Authorization:  "AlterQuotas" can only be interpreted on the "Cluster" resource and represented by the AlterConfigs ACL due to the similarity in use cases. Unauthorized requests will receive an appropriate AuthorizationFailed error code.
  4. For tools that allow users to alter quota configs, a validation/dry-run mode where validation errors are reported but no creation is attempted is available via the validate_only parameter.
  5. Set operation also does Add if needed to be backward compatible with the existing ConfigCommand semantics.

 

Code Block
languagejs
titleAlterQuotas Response
AlterQuotas Response (Version: 0) => throttle_time_ms [resource]
  throttle_time_ms => INT32
  resource => [quota_resource] [quota]
    quota_resource => type name
      type => INT8
      name => STRING
    quota => error_code error_message quota_type
      error_code => INT16
      error_message => NULLABLE_STRING
      quota_type => INT8

...

To enable describing and altering SCRAM credentials we will use the DescribeConfigs and AlterConfigs protocols. There are no changes in the protocol's structure but we will allow the USER resource type to be passed in the protocol. When this happens, the server will know that SCRAM configs are asked and will send them in the response.  In case of AlterConfigs if a USER resource type is passed it will validate if there are only SCRAM credentials are changed. If not, then will fail with InvalidRequestException.

AlterConfigs

This request needs some change as currently the --add-config operation of ConfigCommand would do incremental operations in Zookeeper but the AlterConfigs protocol sets the whole properties object. The purpose of this change to add an enum parameter to the configs so that it can specify the operation (set/add/delete) which needs to be executed. 

Code Block
languagejava
titleAlterConfigs Request
AlterConfigs Request (Version: 1) => [resources] validate_only
  validate_only => BOOLEAN
  resources => resource_type resource_name [configs]
    resource_type => INT8
    resource_name => STRING
    configs => config_name config_value config_operation
      config_name => STRING
      config_value => STRING
      config_operation => INT8                                           <=// new addition

 

Request Semantics:

  1. By default in the broker we parse an AlterConfigRequest version 0 with Unknown operation and handle it with the currently existing behavior. Version 1 requests however must have the operation set to other than Unknown, otherwise an InvalidRequestException will be thrown.
  2. Set operation also does Add if needed to be backward compatible with the existing ConfigCommand semantics.

AdminClient APIs

Code Block
languagejava
titleorg.apache.kafka.clients.admin
public static class Quota {
    public QuotaType type();
    public double value();
    public QuotaSource source();
}

public enum QuotaType {
    PRODUCER_BYTE_RATE((byte) 0, "producer_byte_rate"),
    CONSUMER_BYTE_RATE((byte) 1, "consumer_byte_rate"),
    REQUEST_PERCENTAGE((byte) 2, "request_percentage");

    QuotaType(byte id, String name);
    public byte id();
    public String quotaName();
}
 
public enum AdminOperation {
    ADD((byte) 0),
    SET((byte) 1),
    DELETE((byte) 2);

    QuotaType(byte id);
    public byte id();
}

public enum QuotaSource {
    CLIENT_OF_USER((byte) 0, "Client of user"),
    DEFAULT_CLIENT_OF_USER((byte) 1, "Default client of user"),
    USER((byte) 2, "User"),
    CLIENT_OF_DEFAULT_USER((byte) 3, "Client of default user"),
    DEFAULT_CLIENT_OF_DEFAULT_USER((byte) 4, "Default client of default user"),
    DEFAULT_USER((byte) 5, "Default user"), CLIENT((byte) 6, "Client"),
    DEFAULT_CLIENT((byte) 7, "Default client");

    QuotaSource(byte id, String description);
    public byte id();
    public String description();
}
 
public class AdminClient {
    public DescribeQuotasResult describeQuotas(Map<List<ConfigResource>, Collection<QuotaType>>, DescribeQuotasOptions options);
    public AlterQuotasResult alterQuotas(Map<List<ConfigResource>, Collection<Quota>> configs, AlterQuotasOptions options);
}
public class DescribeQuotasOptions { 
    public DescribeQuotasOptions timeoutMs(Integer timeout);
}

public class DescribeQuotasResult {
    public Map<List<Resource>, KafkaFuture<Collection<Quota>>> values();
}
 
public class AlterQuotasOptions { 
    public AlterQuotasOptions timeoutMs(Integer timeout);
	public AlterQuotasOptions validateOnly(boolean validateOnly);
}

public class AlterQuotasResult {
    public Map<List<Resource>, KafkaFuture<Void>> results();
}

...