...
0: Unknown
1: Any
2: Topic
3: Group
4: Broker
5: User (new)
6: Client (new)
QuotaType (new)
0: ProducerByteRate
1: ConsumerByteRate
2: RequestPercentage
QuotaSource (new)
0: Self
1: Default
2: Parent
Describe Quotas
To be able to implement the use cases of kafka-configs.sh where a quota is modified, like user, client or (user,client) we have to create a protocol to handle quota listings. The justification for a new protocol is that a quota is quite different from a broker or topic config because a quota can sometimes be identified a simple user, client or even a (user,client) tuple while a topic or a broker config can be identified only by the topic's name or the broker's ID.
Code Block |
---|
language | js |
---|
title | DescribeQuotas Request |
---|
|
DescribeQuotas Request (Version: 1) => [resource]
resource => [quota_config_resource] [configquota_nametype]
quota_config_resource => type name
type => INT8
name => STRING
configquota_nametype => STRINGINT8 |
Request semantics:
- Can be sent to any broker
- If the
name
is <default> empty it means that listing the default quota is asked. Responses will be returned the same way for defaults. - If the configthe
quota_nametype
array is null
is empty, all configs quotas are returned. Otherwise, configs quotas with the provided names types are returned. - Authorization: "DescribeQuotas" can only be interpreted on the "Cluster" resource and represented by the DescribeConfigs ACL due to the similarity in use cases. Unauthorized requests will receive an appropriate AuthorizationFailed error code.
...
Code Block |
---|
language | js |
---|
title | DescribeQuotas Response |
---|
|
DescribeQuotas Response (Version: 1) => throttle_time_ms [resource]
throttle_time_ms => INT32
resource => [quota_config_resource] [configquota]
quota_config_resource => type name
type => INT8
name => STRING
configquota => error_code error_message [configquota_entry]
error_code => INT16
error_message => NULLABLE_STRING
configquota_entry =>
config_name => STRING
config_value => STRING quota_type quota_value quota_source
readquota_onlytype => BOOLEANINT8
isquota_defaultvalue => BOOLEANDOUBLE
isquota_sensitivesource => BOOLEANINT8 |
Alter Quotas
Code Block |
---|
language | js |
---|
title | AlterQuotas Request |
---|
|
AlterQuota Request (Version: 0) => validate_only [resource]
validate_only => BOOLEAN
resource => [quota_config_resource] [configquota]
quota_config_resource => type name
type => INT8
name => STRING
configquota => configquota_nametype configquota_value
configquota_nametype => STRINGINT8
configquota_value => STRINGDOUBLE |
Request Semantics
- Can be sent to any broker
- If
name
is <default>
it empty it means that altering a default quota is asked. - If an
Alter
operation is attempted on a read-only config, an InvalidRequestException
error will be returned for the relevant resource. - AuthorizationAuthorization: "AlterQuotas" can only be interpreted on the "Cluster" resource and represented by the AlterConfigs ACL due to the similarity in use cases. Unauthorized requests will receive an appropriate AuthorizationFailed error code.
- For tools that allow users to alter quota configs, a validation/dry-run mode where validation errors are reported but no creation is attempted is available via the
validate_only
parameter.
...
Code Block |
---|
language | js |
---|
title | AlterQuotas Response |
---|
|
AlterQuotas Response (Version: 0) => throttle_time_ms [resource]
throttle_time_ms => INT32
resource => [quota_config_resource] [configquota]
quota_config_resource => type name
type => INT8
name => STRING
configquota => error_code error_message quota_type
error_code => INT16
error_message => NULLABLE_STRING
configquota_nametype => STRINGINT8 |
AdminClient APIs
Code Block |
---|
language | java |
---|
title | org.apache.kafka.clients.admin |
---|
|
public class AdminClient {
public DescribeQuotasResult describeQuotas(String userId, String clientId, Collection<String> configs, final DescribeQuotasOptions options);
public AlterQuotasResult alterQuotas(Map<QuotaEntityTuple, Config> configs, AlterQuotasOptions options);
}
public class DescribeQuotasOptions { /**
* Represents a list of Resource objects that have a hierarchical relationship.
* For instance one could represent relationship like "clientA of user1".
*/
public class ResourceList {
public DescribeQuotasOptions timeoutMs(Integer timeout);
}
public class DescribeQuotasResult {
public Map<QuotaConfigResourceTuple, KafkaFuture<Config>> values();
}
public class AlterQuotasOptions {
public AlterQuotasOptions timeoutMs(Integer timeout);
public AlterQuotasOptions validateOnly(boolean validateOnly);
}
public class AlterQuotasResult { ResourceList(String user, String client);
public ResourceList(ResourceType type, String resourceName);
/**
* Returns the list of resources in a top to bottom (first to last) order.
*/
public Map<QuotaConfigResourceTuple, KafkaFuture<Void>> results();
} |
Request API
List<Resource> resources();
}
/**
* This class acts as an alias for a HashMap that maps a list of Resource
* objects which represents hierarchical
*
public class QuotaResourceMap extends HashMap<ResourceList, T> {
public QuotaResourceMap(ResourceList key, T value);
public QuotaResourceMap(Map<ResourceList, T>);
}
public class AdminClient {
public DescribeQuotasResult describeQuotas(QuotaResourceMap<Collection<QuotaType>>, DescribeQuotasOptions options);
public AlterQuotasResult alterQuotas(QuotaResourceMap<Collection<Quota>> configs, AlterQuotasOptions options);
}
public class DescribeQuotasOptions {
public DescribeQuotasOptions timeoutMs(Integer timeout);
}
public class DescribeQuotasResult {
public QuotaResourceMap<KafkaFuture<Collection<Quota>>> values();
}
public class AlterQuotasOptions {
public AlterQuotasOptions timeoutMs(Integer timeout);
public AlterQuotasOptions validateOnly(boolean validateOnly);
}
public class AlterQuotasResult {
public QuotaResourceMap<KafkaFuture<Void>> results();
} |
Request API
Code Block |
---|
language | java |
---|
title | org.apache.kafka.common.requests |
---|
|
public class QuotaList {
public QuotaList(ApiError error, Collection<Quota> entries);
public QuotaList(Collection<Quota> entries);
public ApiError error();
public Collection<Quota> entries();
}
public static class Quota {
public QuotaType type();
public double value();
public QuotaSource source();
}
public static enum QuotaType {
PRDOUCER_BYTE_RATE(0), CONSUMER_BYTE_RATE(1), REQUEST_PERCENTAGE(2);
QuotaType(byte id);
public byte id();
}
public static enum QuotaSource {
SELF(0), DEFAULT(1), PARENT(2);
QuotaSource(byte id);
public byte id();
}
public class DescribeQuotasRequest extends AbstractRequest {
public static Schema[] schemaVersions();
public static DescribeQuotasRequest parse(ByteBuffer buffer, short version);
public static class Builder extends AbstractRequest.Builder {
public Builder(Map<List<Resource>>, Collection<QuotaType>> quotaSettings);
public DescribeQuotasRequest build(short version);
}
public DescribeQuotasRequest(short version, Map<List<Resource>>, Collection<QuotaType>> quotaSettings);
public DescribeQuotasRequest(Struct struct |
Code Block |
---|
language | java |
---|
title | org.apache.kafka.common.requests |
---|
|
public class DescribeQuotasRequest extends AbstractRequest {
public static Schema[] schemaVersions();
public static DescribeQuotasRequest parse(ByteBuffer buffer, short version);
public static Map<List<Resource>>, Collection<QuotaType>> quotaSettings();
}
public class BuilderDescribeQuotasResponse extends AbstractRequest.Builder {
public Builder(Map<QuotaConfigResourceTuple, Collection<String>> quotaConfigSettings);
AbstractResponse {
public DescribeQuotasRequest build(short version);
}
static Schema[] schemaVersions();
public DescribeQuotasRequestDescribeQuotasResponse(shortint versionthrottleTimeMs, Map<QuotaConfigResourceTupleMap<List<Resource>>, Collection<String>>QuotaType> quotaConfigSettingsconfigs);
public DescribeQuotasRequestDescribeQuotasResponse(Struct struct, short version);
public Map<QuotaConfigResourceTupleMap<List<Resource>>, Collection<String>>DescribeConfigsResponse.Config> quotaConfigSettings();
}
public class DescribeQuotasResponseAlterQuotasRequest extends AbstractResponseAbstractRequest {
public static Schema[] schemaVersions();
public DescribeQuotasResponseAlterQuotasRequest(intshort throttleTimeMsversion, Map<QuotaConfigResourceTupleMap<List<Resource>>, DescribeConfigsResponseAlterConfigsRequest.Config> configs, boolean validateOnly);
public DescribeQuotasResponseAlterQuotasRequest(Struct struct, short version);
public Map<QuotaConfigResourceTupleMap<List<Resource>>, DescribeConfigsResponse.Config> quotaConfigSettingsconfigs();
}
|
New Command Line Interface
...