DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
...
Status
Current state: Accepted - 2.6.0 contains describe and alter functionality, resolve is pending for a future release.
Discussion thread: here
JIRA: KAFKA-7740
...
Admin client calls will be added to correspond to DescribeClientQuotas, ResolveClientQuotas, and AlterClientQuotas, with supporting types defined in the common.quotas package.
Common types in package org.apache.kafka.common.quota (2.6.0)
| Code Block | ||
|---|---|---|
| ||
/**
* Describes a client quota entity, which is a mapping of entity types to their names.
*/
public class ClientQuotaEntity {
/**
* The type of an entity entry.
*/
public static final String USER = "user";
public static final String CLIENT_ID = "client-id";
/**
* Constructs a quota entity for the given types and names. If a name is null,
* then it is mapped to the built-in default entity name.
*
* @param entries maps entity type to its name
*/
public ClientQuotaEntity(Map<String, String> entries);
/**
* @return map of entity type to its name
*/
public Map<String, String> entries();
}
/**
* Describes a component for applying a client quota filter.
*/
public class ClientQuotaFilterComponent {
/**
* Constructs and returns a filter component that exactly matches the provided entity
* name for the entity type.
*
* @param entityType the entity type the filter component applies to
* @param entityName the entity name that's matched exactly
*/
public static ClientQuotaFilterComponent ofEntity(String entityType, String entityName);
/**
* Constructs and returns a filter component that matches the built-in default entity name
* for the entity type.
*
* @param entityType the entity type the filter component applies to
*/
public static ClientQuotaFilterComponent ofDefaultEntity(String entityType);
/**
* Constructs and returns a filter component that matches any specified name for the
* entity type.
*
* @param entityType the entity type the filter component applies to
*/
public static ClientQuotaFilterComponent ofEntityType(String entityType);
/**
* @return the component's entity type
*/
public String entityType();
/**
* @return the optional match string, where:
* if present, the name that's matched exactly
* if empty, matches the default name
* if null, matches any specified name
*/
public Optional<String> match();
}
/**
* Describes a client quota entity filter.
*/
public class ClientQuotaFilter {
/**
* A filter to be applied to matching client quotas.
*
* @param components the components to filter on
* @param strict whether the filter only includes specified components
*/
private ClientQuotaFilter(Collection<ClientQuotaFilterComponent> components, boolean strict);
/**
* Constructs and returns a quota filter that matches all provided components. Matching entities
* with entity types that are not specified by a component will also be included in the result.
*
* @param components the components for the filter
*/
public static ClientQuotaFilter contains(Collection<ClientQuotaFilterComponent> components);
/**
* Constructs and returns a quota filter that matches all provided components. Matching entities
* with entity types that are not specified by a component will *not* be included in the result.
*
* @param components the components for the filter
*/
public static ClientQuotaFilter containsOnly(Collection<ClientQuotaFilterComponent> components);
/**
* Constructs and returns a quota filter that matches all configured entities.
*/
public static ClientQuotaFilter all();
/**
* @return the filter's components
*/
public Collection<ClientQuotaFilterComponent> components();
/**
* @return whether the filter is strict, i.e. only includes specified components
*/
public boolean strict();
}
/**
* Describes a configuration alteration to be made to a client quota entity.
*/
public class ClientQuotaAlteration {
public static class Op {
/**
* @param key the quota type to alter
* @param value if set then the existing value is updated,
* otherwise if null, the existing value is cleared
*/
public Op(String key, Double value);
/**
* @return the quota type to alter
*/
public String key();
/**
* @return if set then the existing value is updated,
* otherwise if null, the existing value is cleared
*/
public Double value();
}
private final ClientQuotaEntity entity;
private final Collection<Op> ops;
/**
* @param entity the entity whose config will be modified
* @param ops the alteration to perform
*/
public ClientQuotaAlteration(ClientQuotaEntity entity, Collection<Op> ops);
/**
* @return the entity whose config will be modified
*/
public ClientQuotaEntity entity();
/**
* @return the alteration to perform
*/
public Collection<Op> ops();
}
|
DescribeClientQuotas (2.6.0)
| Code Block | ||
|---|---|---|
| ||
public class DescribeClientQuotasOptions extends AbstractOptions<DescribeClientQuotasOptions> {
// Empty.
}
/**
* The result of the {@link Admin#describeClientQuotas(Collection, DescribeClientQuotasOptions)} call.
*/
public class DescribeClientQuotasResult {
/**
* Maps an entity to its configured quota value(s). Note if no value is defined for a quota
* type for that entity's config, then it is not included in the resulting value map.
*
* @param entities future for the collection of entities that matched the filter
*/
public DescribeClientQuotasResult(KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> entities);
/**
* Returns a map from quota entity to a future which can be used to check the status of the operation.
*/
public KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> entities();
}
public interface Admin extends AutoCloseable {
...
/**
* Describes all entities matching the provided filter that have at least one client quota configuration
* value defined.
* <p>
* The following exceptions can be anticipated when calling {@code get()} on the future from the
* returned {@link DescribeClientQuotasResult}:
* <ul>
* <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
* If the authenticated user didn't have describe access to the cluster.</li>
* <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
* If the request details are invalid. e.g., an invalid entity type was specified.</li>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* If the request timed out before the describe could finish.</li>
* </ul>
* <p>
* This operation is supported by brokers with version 2.6.0 or higher.
*
* @param filter the filter to apply to match entities
* @param options the options to use
* @return the DescribeClientQuotasResult containing the result
*/
DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options);
}
|
ResolveClientQuotas
...
(pending future release)
| Code Block | ||
|---|---|---|
| ||
public class ResolveClientQuotasOptions extends AbstractOptions<ResolveClientQuotasOptions> {
// Empty.
}
/**
* The result of the {@link Admin#resolveClientQuotas(Collection, ResolveClientQuotasOptions)} call.
*/
public class ResolveClientQuotasResult {
/**
* Information about a specific quota configuration entry.
*/
public class Entry {
/**
* @param source the entity source for the value
* @param value the non-null value
*/
public Entry(QuotaEntity source, Double value);
}
/**
* Information about the value for a quota type.
*
* NOTE: We maintain a `Value` class because additional information may be added, e.g.,
* a list of overridden entries.
*/
public class Value {
/**
* @param entry the quota entry
*/
public Value(Entry entry);
}
/**
* Maps a collection of entities to their resolved quota values.
*
* @param config the quota configuration for the requested entities
*/
public ResolveClientQuotasResult(Map<QuotaEntity, KafkaFuture<Map<String, Value>>> config);
/**
* Returns a map from quota entity to a future which can be used to check the status of the operation.
*/
public Map<QuotaEntity, KafkaFuture<Map<String, Value>>> config();
/**
* Returns a future which succeeds only if all quota descriptions succeed.
*/
public KafkaFuture<Void> all();
}
public interface Admin extends AutoCloseable {
...
/**
* Resolves the effective quota values for the provided entities.
*
* @param entities the entities to describe the resolved quotas for
* @param options the options to use
* @return the resolved quotas for the entities
*/
ResolveClientQuotasResult resolveClientQuotas(Collection<QuotaEntity> entities, ResolveClientQuotasOptions options);
} |
AlterClientQuotas (2.6.0)
| Code Block | ||||
|---|---|---|---|---|
| ||||
/**
* Options for {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
public class AlterClientQuotasOptions extends AbstractOptions<AlterClientQuotasOptions> {
/**
* Returns whether the request should be validated without altering the configs.
*/
public boolean validateOnly();
/**
* Sets whether the request should be validated without altering the configs.
*/
public AlterClientQuotasOptions validateOnly(boolean validateOnly);
}
/**
* The result of the {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class AlterClientQuotasResult {
/**
* Maps an entity to its alteration result.
*
* @param futures maps entity to its alteration result
*/
public AlterClientQuotasResult(Map<ClientQuotaEntity, KafkaFuture<Void>> futures);
/**
* Returns a map from quota entity to a future which can be used to check the status of the operation.
*/
public Map<ClientQuotaEntity, KafkaFuture<Void>> values();
/**
* Returns a future which succeeds only if all quota alterations succeed.
*/
public KafkaFuture<Void> all();
}
public interface Admin extends AutoCloseable {
...
/**
* Alters client quota configurations with the specified alterations.
* <p>
* Alterations for a single entity are atomic, but across entities is not guaranteed. The resulting
* per-entity error code should be evaluated to resolve the success or failure of all updates.
* <p>
* The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
* the returned {@link AlterClientQuotasResult}:
* <ul>
* <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
* If the authenticated user didn't have alter access to the cluster.</li>
* <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
* If the request details are invalid. e.g., a configuration key was specified more than once for an entity.</li>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* If the request timed out before the alterations could finish. It cannot be guaranteed whether the update
* succeed or not.</li>
* </ul>
* <p>
* This operation is supported by brokers with version 2.6.0 or higher.
*
* @param entries the alterations to perform
* @return the AlterClientQuotasResult containing the result
*/
AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
} |
kafka-configs.sh/ConfigCommand (2.6.0)
As a result of introducing the APIs, the ConfigCommand will be updated to support the users and clients entity types when using the --bootstrap-server option. The modification to ConfigCommand was adopted in KIP-543, and usage will remain unchanged from the original --zookeeper functionality.
kafka-client-quotas.sh/ClientQuotasCommand (pending future release)
A ClientQuotasCommand would be constructed with an associated bin/kafka-client-quotas.sh script for managing quotas via command line, and would have three modes of operation, roughly correlating to each of the API calls:
...
In addition to the API changes above, the following write protocol would be implemented:
DescribeClientQuotas (2.6.0)
| Code Block |
|---|
{
"apiKey": 48,
"type": "request",
"name": "DescribeClientQuotasRequest",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "Components", "type": "[]ComponentData", "versions": "0+",
"about": "Filter components to apply to quota entities.", "fields": [
{ "name": "EntityType", "type": "string", "versions": "0+",
"about": "The entity type that the filter component applies to." },
{ "name": "MatchType", "type": "int8", "versions": "0+",
"about": "How to match the entity {0 = exact name, 1 = default name, 2 = any specified name}." },
{ "name": "Match", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The string to match against, or null if unused for the match type." }
]},
{ "name": "Strict", "type": "bool", "versions": "0+",
"about": "Whether the match is strict, i.e. should exclude entities with unspecified entity types." }
]
}
{
"apiKey": 48,
"type": "response",
"name": "DescribeClientQuotasResponse",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or `0` if the quota description succeeded." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The error message, or `null` if the quota description succeeded." },
{ "name": "Entries", "type": "[]EntryData", "versions": "0+", "nullableVersions": "0+",
"about": "A result entry.", "fields": [
{ "name": "Entity", "type": "[]EntityData", "versions": "0+",
"about": "The quota entity description.", "fields": [
{ "name": "EntityType", "type": "string", "versions": "0+",
"about": "The entity type." },
{ "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The entity name, or null if the default." }
]},
{ "name": "Values", "type": "[]ValueData", "versions": "0+",
"about": "The quota values for the entity.", "fields": [
{ "name": "Key", "type": "string", "versions": "0+",
"about": "The quota configuration key." },
{ "name": "Value", "type": "float64", "versions": "0+",
"about": "The quota configuration value." }
]}
]}
]
}
|
ResolveClientQuotas (pending future release)
| Code Block |
|---|
{
"apiKey": 50,
"type": "request",
"name": "ResolveClientQuotasRequest",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "Entity", "type": "[]QuotaEntityData", "versions": "0+",
"about": "The quota entity description.", "fields": [
{ "name": "EntityType", "type": "string", "versions": "0+",
"about": "The entity type." },
{ "name": "EntityName", "type": "string", "versions": "0+",
"about": "The entity name." }
]}
]
}
{
"apiKey": 50,
"type": "response",
"name": "ResolveClientQuotasResponse",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Entry", "type": "[]QuotaEntryData", "versions": "0+",
"about": "Resolved quota entries.", "fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or `0` if the resolved quota description succeeded." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The error message, or `null` if the resolved quota description succeeded." },
{ "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+",
"about": "Resolved quota entries.", "fields": [
{ "name": "EntityType", "type": "string", "versions": "0+",
"about": "The entity type." },
{ "name": "EntityName", "type": "string", "versions": "0+",
"about": "The entity name." }
]},
{ "name": "QuotaValues", "type": "[]QuotaValueData", "versions": "0+",
"about": "Quota configuration values.", "fields": [
{ "name": "Type", "type": "string", "versions": "0+",
"about": "The quota type." },
{ "name": "Entry", "type": "[]ValueEntryData", "versions": "0+",
"about": "Quota value entries.", "fields": [
{ "name": "QuotaEntity", "type": "[]ValueQuotaEntity", "versions": "0+",
"about": "Resolved quota entries.", "fields": [
{ "name": "EntityType", "type": "string", "versions": "0+",
"about": "The entity type." },
{ "name": "EntityName", "type": "string", "versions": "0+",
"about": "The entity name." }
]},
{ "name": "Value", "type": "double", "versions": "0+",
"about": "The quota configuration value." }
]}
]}
]}
]
}
|
AlterClientQuotas (2.6.0)
| Code Block |
|---|
{
"apiKey": 49,
"type": "request",
"name": "AlterClientQuotasRequest",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "Entries", "type": "[]EntryData", "versions": "0+",
"about": "The quota configuration entries to alter.", "fields": [
{ "name": "Entity", "type": "[]EntityData", "versions": "0+",
"about": "The quota entity to alter.", "fields": [
{ "name": "EntityType", "type": "string", "versions": "0+",
"about": "The entity type." },
{ "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The name of the entity, or null if the default." }
]},
{ "name": "Ops", "type": "[]OpData", "versions": "0+",
"about": "An individual quota configuration entry to alter.", "fields": [
{ "name": "Key", "type": "string", "versions": "0+",
"about": "The quota configuration key." },
{ "name": "Value", "type": "float64", "versions": "0+",
"about": "The value to set, otherwise ignored if the value is to be removed." },
{ "name": "Remove", "type": "bool", "versions": "0+",
"about": "Whether the quota configuration value should be removed, otherwise set." }
]}
]},
{ "name": "ValidateOnly", "type": "bool", "versions": "0+",
"about": "Whether the alteration should be validated, but not performed." }
]
}
{
"apiKey": 49,
"type": "response",
"name": "AlterClientQuotasResponse",
"validVersions": "0",
"flexibleVersions": "none",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Entries", "type": "[]EntryData", "versions": "0+",
"about": "The quota configuration entries to alter.", "fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or `0` if the quota alteration succeeded." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The error message, or `null` if the quota alteration succeeded." },
{ "name": "Entity", "type": "[]EntityData", "versions": "0+",
"about": "The quota entity to alter.", "fields": [
{ "name": "EntityType", "type": "string", "versions": "0+",
"about": "The entity type." },
{ "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The name of the entity, or null if the default." }
]}
]}
]
} |
Kafka RPC 'double' support (2.6.0)
Note that, while the ByteBuffer natively supports serializing a Double, the format in which the value is serialized is not strongly specified, so the preference is to explicitly ensure a standard representation of double-precision 64-bit IEEE 754 format. This is achieved in Java using Double.doubleToRawLongBits() and Double.longBitsToDouble() and should be easily portable to other languages.
...