Current state: Accepted - 2.6.0 contains describe and alter functionality, resolve is pending for a future release.
Discussion thread: here
Admin client calls will be added to correspond to DescribeClientQuotas
, ResolveClientQuotas
, and AlterClientQuotas
, with supporting types defined in the common.quotas package.
Common types in package org.apache.kafka.common.quota (2.6.0)
Code Block | ||
| ||
/** * Describes a client quota entity, which is a mapping of entity types to their names. */ public class ClientQuotaEntity { /** * The type of an entity entry. */ public static final String USER = "user"; public static final String CLIENT_ID = "client-id"; /** * Constructs a quota entity for the given types and names. If a name is null, * then it is mapped to the built-in default entity name. * * @param entries maps entity type to its name */ public ClientQuotaEntity(Map<String, String> entries); /** * @return map of entity type to its name */ public Map<String, String> entries(); } /** * Describes a component for applying a client quota filter. */ public class ClientQuotaFilterComponent { /** * Constructs and returns a filter component that exactly matches the provided entity * name for the entity type. * * @param entityType the entity type the filter component applies to * @param entityName the entity name that's matched exactly */ public static ClientQuotaFilterComponent ofEntity(String entityType, String entityName); /** * Constructs and returns a filter component that matches the built-in default entity name * for the entity type. * * @param entityType the entity type the filter component applies to */ public static ClientQuotaFilterComponent ofDefaultEntity(String entityType); /** * Constructs and returns a filter component that matches any specified name for the * entity type. * * @param entityType the entity type the filter component applies to */ public static ClientQuotaFilterComponent ofEntityType(String entityType); /** * @return the component's entity type */ public String entityType(); /** * @return the optional match string, where: * if present, the name that's matched exactly * if empty, matches the default name * if null, matches any specified name */ public Optional<String> match(); } /** * Describes a client quota entity filter. */ public class ClientQuotaFilter { /** * A filter to be applied to matching client quotas. * * @param components the components to filter on * @param strict whether the filter only includes specified components */ private ClientQuotaFilter(Collection<ClientQuotaFilterComponent> components, boolean strict); /** * Constructs and returns a quota filter that matches all provided components. Matching entities * with entity types that are not specified by a component will also be included in the result. * * @param components the components for the filter */ public static ClientQuotaFilter contains(Collection<ClientQuotaFilterComponent> components); /** * Constructs and returns a quota filter that matches all provided components. Matching entities * with entity types that are not specified by a component will *not* be included in the result. * * @param components the components for the filter */ public static ClientQuotaFilter containsOnly(Collection<ClientQuotaFilterComponent> components); /** * Constructs and returns a quota filter that matches all configured entities. */ public static ClientQuotaFilter all(); /** * @return the filter's components */ public Collection<ClientQuotaFilterComponent> components(); /** * @return whether the filter is strict, i.e. only includes specified components */ public boolean strict(); } /** * Describes a configuration alteration to be made to a client quota entity. */ public class ClientQuotaAlteration { public static class Op { /** * @param key the quota type to alter * @param value if set then the existing value is updated, * otherwise if null, the existing value is cleared */ public Op(String key, Double value); /** * @return the quota type to alter */ public String key(); /** * @return if set then the existing value is updated, * otherwise if null, the existing value is cleared */ public Double value(); } private final ClientQuotaEntity entity; private final Collection<Op> ops; /** * @param entity the entity whose config will be modified * @param ops the alteration to perform */ public ClientQuotaAlteration(ClientQuotaEntity entity, Collection<Op> ops); /** * @return the entity whose config will be modified */ public ClientQuotaEntity entity(); /** * @return the alteration to perform */ public Collection<Op> ops(); } |
DescribeClientQuotas (2.6.0)
Code Block | ||
| ||
public class DescribeClientQuotasOptions extends AbstractOptions<DescribeClientQuotasOptions> { // Empty. } /** * The result of the {@link Admin#describeClientQuotas(Collection, DescribeClientQuotasOptions)} call. */ public class DescribeClientQuotasResult { /** * Maps an entity to its configured quota value(s). Note if no value is defined for a quota * type for that entity's config, then it is not included in the resulting value map. * * @param entities future for the collection of entities that matched the filter */ public DescribeClientQuotasResult(KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> entities); /** * Returns a map from quota entity to a future which can be used to check the status of the operation. */ public KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> entities(); } public interface Admin extends AutoCloseable { ... /** * Describes all entities matching the provided filter that have at least one client quota configuration * value defined. * <p> * The following exceptions can be anticipated when calling {@code get()} on the future from the * returned {@link DescribeClientQuotasResult}: * <ul> * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException} * If the authenticated user didn't have describe access to the cluster.</li> * <li>{@link org.apache.kafka.common.errors.InvalidRequestException} * If the request details are invalid. e.g., an invalid entity type was specified.</li> * <li>{@link org.apache.kafka.common.errors.TimeoutException} * If the request timed out before the describe could finish.</li> * </ul> * <p> * This operation is supported by brokers with version 2.6.0 or higher. * * @param filter the filter to apply to match entities * @param options the options to use * @return the DescribeClientQuotasResult containing the result */ DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options); } |
(pending future release)
Code Block | ||
| ||
public class ResolveClientQuotasOptions extends AbstractOptions<ResolveClientQuotasOptions> { // Empty. } /** * The result of the {@link Admin#resolveClientQuotas(Collection, ResolveClientQuotasOptions)} call. */ public class ResolveClientQuotasResult { /** * Information about a specific quota configuration entry. */ public class Entry { /** * @param source the entity source for the value * @param value the non-null value */ public Entry(QuotaEntity source, Double value); } /** * Information about the value for a quota type. * * NOTE: We maintain a `Value` class because additional information may be added, e.g., * a list of overridden entries. */ public class Value { /** * @param entry the quota entry */ public Value(Entry entry); } /** * Maps a collection of entities to their resolved quota values. * * @param config the quota configuration for the requested entities */ public ResolveClientQuotasResult(Map<QuotaEntity, KafkaFuture<Map<String, Value>>> config); /** * Returns a map from quota entity to a future which can be used to check the status of the operation. */ public Map<QuotaEntity, KafkaFuture<Map<String, Value>>> config(); /** * Returns a future which succeeds only if all quota descriptions succeed. */ public KafkaFuture<Void> all(); } public interface Admin extends AutoCloseable { ... /** * Resolves the effective quota values for the provided entities. * * @param entities the entities to describe the resolved quotas for * @param options the options to use * @return the resolved quotas for the entities */ ResolveClientQuotasResult resolveClientQuotas(Collection<QuotaEntity> entities, ResolveClientQuotasOptions options); } |
AlterClientQuotas (2.6.0)
Code Block | ||||
| ||||
/** * Options for {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)}. * * The API of this class is evolving, see {@link Admin} for details. */ public class AlterClientQuotasOptions extends AbstractOptions<AlterClientQuotasOptions> { /** * Returns whether the request should be validated without altering the configs. */ public boolean validateOnly(); /** * Sets whether the request should be validated without altering the configs. */ public AlterClientQuotasOptions validateOnly(boolean validateOnly); } /** * The result of the {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)} call. * * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class AlterClientQuotasResult { /** * Maps an entity to its alteration result. * * @param futures maps entity to its alteration result */ public AlterClientQuotasResult(Map<ClientQuotaEntity, KafkaFuture<Void>> futures); /** * Returns a map from quota entity to a future which can be used to check the status of the operation. */ public Map<ClientQuotaEntity, KafkaFuture<Void>> values(); /** * Returns a future which succeeds only if all quota alterations succeed. */ public KafkaFuture<Void> all(); } public interface Admin extends AutoCloseable { ... /** * Alters client quota configurations with the specified alterations. * <p> * Alterations for a single entity are atomic, but across entities is not guaranteed. The resulting * per-entity error code should be evaluated to resolve the success or failure of all updates. * <p> * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from * the returned {@link AlterClientQuotasResult}: * <ul> * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException} * If the authenticated user didn't have alter access to the cluster.</li> * <li>{@link org.apache.kafka.common.errors.InvalidRequestException} * If the request details are invalid. e.g., a configuration key was specified more than once for an entity.</li> * <li>{@link org.apache.kafka.common.errors.TimeoutException} * If the request timed out before the alterations could finish. It cannot be guaranteed whether the update * succeed or not.</li> * </ul> * <p> * This operation is supported by brokers with version 2.6.0 or higher. * * @param entries the alterations to perform * @return the AlterClientQuotasResult containing the result */ AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options); } |
kafka-configs.sh/ConfigCommand (2.6.0)
As a result of introducing the APIs, the ConfigCommand
will be updated to support the users and clients entity types when using the --bootstrap-server
option. The modification to ConfigCommand
was adopted in KIP-543, and usage will remain unchanged from the original --zookeeper
kafka-client-quotas.sh/ClientQuotasCommand (pending future release)
A ClientQuotasCommand
would be constructed with an associated bin/kafka-client-quotas.sh
script for managing quotas via command line, and would have three modes of operation, roughly correlating to each of the API calls:
In addition to the API changes above, the following write protocol would be implemented:
DescribeClientQuotas (2.6.0)
Code Block |
{ "apiKey": 48, "type": "request", "name": "DescribeClientQuotasRequest", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "Components", "type": "[]ComponentData", "versions": "0+", "about": "Filter components to apply to quota entities.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type that the filter component applies to." }, { "name": "MatchType", "type": "int8", "versions": "0+", "about": "How to match the entity {0 = exact name, 1 = default name, 2 = any specified name}." }, { "name": "Match", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The string to match against, or null if unused for the match type." } ]}, { "name": "Strict", "type": "bool", "versions": "0+", "about": "Whether the match is strict, i.e. should exclude entities with unspecified entity types." } ] } { "apiKey": 48, "type": "response", "name": "DescribeClientQuotasResponse", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or `0` if the quota description succeeded." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The error message, or `null` if the quota description succeeded." }, { "name": "Entries", "type": "[]EntryData", "versions": "0+", "nullableVersions": "0+", "about": "A result entry.", "fields": [ { "name": "Entity", "type": "[]EntityData", "versions": "0+", "about": "The quota entity description.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The entity name, or null if the default." } ]}, { "name": "Values", "type": "[]ValueData", "versions": "0+", "about": "The quota values for the entity.", "fields": [ { "name": "Key", "type": "string", "versions": "0+", "about": "The quota configuration key." }, { "name": "Value", "type": "float64", "versions": "0+", "about": "The quota configuration value." } ]} ]} ] } |
ResolveClientQuotas (pending future release)
Code Block |
{ "apiKey": 50, "type": "request", "name": "ResolveClientQuotasRequest", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "Entity", "type": "[]QuotaEntityData", "versions": "0+", "about": "The quota entity description.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "about": "The entity name." } ]} ] } { "apiKey": 50, "type": "response", "name": "ResolveClientQuotasResponse", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Entry", "type": "[]QuotaEntryData", "versions": "0+", "about": "Resolved quota entries.", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or `0` if the resolved quota description succeeded." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The error message, or `null` if the resolved quota description succeeded." }, { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+", "about": "Resolved quota entries.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "about": "The entity name." } ]}, { "name": "QuotaValues", "type": "[]QuotaValueData", "versions": "0+", "about": "Quota configuration values.", "fields": [ { "name": "Type", "type": "string", "versions": "0+", "about": "The quota type." }, { "name": "Entry", "type": "[]ValueEntryData", "versions": "0+", "about": "Quota value entries.", "fields": [ { "name": "QuotaEntity", "type": "[]ValueQuotaEntity", "versions": "0+", "about": "Resolved quota entries.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "about": "The entity name." } ]}, { "name": "Value", "type": "double", "versions": "0+", "about": "The quota configuration value." } ]} ]} ]} ] } |
AlterClientQuotas (2.6.0)
Code Block |
{ "apiKey": 49, "type": "request", "name": "AlterClientQuotasRequest", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "Entries", "type": "[]EntryData", "versions": "0+", "about": "The quota configuration entries to alter.", "fields": [ { "name": "Entity", "type": "[]EntityData", "versions": "0+", "about": "The quota entity to alter.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The name of the entity, or null if the default." } ]}, { "name": "Ops", "type": "[]OpData", "versions": "0+", "about": "An individual quota configuration entry to alter.", "fields": [ { "name": "Key", "type": "string", "versions": "0+", "about": "The quota configuration key." }, { "name": "Value", "type": "float64", "versions": "0+", "about": "The value to set, otherwise ignored if the value is to be removed." }, { "name": "Remove", "type": "bool", "versions": "0+", "about": "Whether the quota configuration value should be removed, otherwise set." } ]} ]}, { "name": "ValidateOnly", "type": "bool", "versions": "0+", "about": "Whether the alteration should be validated, but not performed." } ] } { "apiKey": 49, "type": "response", "name": "AlterClientQuotasResponse", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Entries", "type": "[]EntryData", "versions": "0+", "about": "The quota configuration entries to alter.", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or `0` if the quota alteration succeeded." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The error message, or `null` if the quota alteration succeeded." }, { "name": "Entity", "type": "[]EntityData", "versions": "0+", "about": "The quota entity to alter.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The name of the entity, or null if the default." } ]} ]} ] } |
Kafka RPC 'double' support (2.6.0)
Note that, while the ByteBuffer
natively supports serializing a Double
, the format in which the value is serialized is not strongly specified, so the preference is to explicitly ensure a standard representation of double-precision 64-bit IEEE 754 format. This is achieved in Java using Double.doubleToRawLongBits()
and Double.longBitsToDouble()
and should be easily portable to other languages.