...
The broker hosting the controller receives all the above request (already the case today) and can accept partition mutations with the rate R partitions per second and the maximum burst worth of B partitions. All the operations worth of N partition mutations are accepted until the burst is exhausted. All the remaining are rejected with a `QUOTAa `THROTTLING_QUOTA_VIOLATED` EXCEEDED` error. The complete algorithm is formalised bellow.
...
Handling of new Clients
For new clients, the new ``THROTTLING_QUOTA_VIOLATED` EXCEEDED` error code will be used to inform client that a topic has been rejected. The `throttle_time_ms` field is used to inform the client about how long it has been throttled:
...
Public Interfaces
Protocol
In order to be able to distinguish the new clients from the old ones, we will bump to version of the following requests/responses:
- `CreateTopicsRequest` and `CreateTopicsResponse` to version 6.
- `CreatePartitionsRequest` and `CreatePartitionsResponse` to version 3.
- `DeleteTopicRequest` and `DeleteTopicResponse` to version 5.
Starting from the bumped version, the new `QUOTA_VIOLATED` error will be used. It won't be used for older versions.
We will add the `ErrorMessage` field in the `DeleteTopicResponse` as follow:
The following new error code will be introduced:
- THROTTLING_QUOTA_EXCEEDED: It will be used when the defined throttling quota is exceeded but only for the above RPCs.
While discussing the naming of the error. We have also thought about using LIMIT_QUOTA_EXCEEDED in the future when a limit quota is exceeded. A limit is final error whereas a throttling error can be retried.
In order to be able to distinguish the new clients from the old ones, we will bump to version of the following requests/responses:
- `CreateTopicsRequest` and `CreateTopicsResponse` to version 6.
- `CreatePartitionsRequest` and `CreatePartitionsResponse` to version 3.
- `DeleteTopicRequest` and `DeleteTopicResponse` to version 5.
Starting from the bumped version, the new `THROTTLING_QUOTA_EXCEEDED` error will be used. It won't be used for older versions.
We will add the `ErrorMessage` field in the `DeleteTopicResponse` as follow:
Code Block | ||||
---|---|---|---|---|
| ||||
{
"apiKey": 20,
| ||||
Code Block | ||||
| ||||
{
"apiKey": 20,
"type": "response",
"name": "DeleteTopicsResponse",
// Version 1 adds the throttle time.
//
// Starting in version 2, on quota violation, brokers send out responses before throttling.
//
// Starting in version 3, a TOPIC_DELETION_DISABLED error code may be returned.
//
// Version 4 is the first flexible version.
//
// Version 5 adds the ErrorMessage field.
"validVersions": "0-5",
"flexibleVersions": "4+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "Responses", "type": "[]DeletableTopicResult", "versions": "0+",
"about": "The results for each topic we tried to delete.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
"about": "The topic name" },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The deletion error, or 0 if the deletion succeeded." },
{ "name": "ErrorMessage", "type": "string", "versions": "5+", "nullableVersions": "5+", "ignorable": true,
"about": "The error message, or null if there was no error." }
]}
]
} |
...
They will be supported for <client-id>, <user> and <user, client-id> similar to the existing quotas. Defaults can be configured using the dynamic default properties at <client-id>, <user> and <user, client-id> , <user> and <user, client-id> levels.* We keep the name intentionally generic to allow us to extend their coverage in the future. levels.
* We keep the name intentionally generic to allow us to extend their coverage in the future.
The new name works similarly to the already existing quota. It can be set or changed by using the `kafka-configs.sh` tool.
For instance, the above command defines a quota of 10 controller mutations per secs for the (user=user1, client-id=clientA):
Code Block | ||
---|---|---|
| ||
> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'controller_mutations_rate=10' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'. |
The above commands list the quota of (user=user1, client-id=clientA):
Code Block | ||
---|---|---|
| ||
> bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are controller_mutations_rate=10,... (other quotas are listed as well) |
Broker Metrics
We propose to expose the following new metric in the Kafka Broker:
...
As mentioned, we propose to introduce a new retryable `QuotaViolatedException` exception which will be given back to the caller when a topic is rejected due to throttling. The new exception maps to the new `QUOTA`THROTTLING_VIOLATEDQUOTA_ERROR` EXCEEDED` error.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Exception thrown if an operation on a resource violateexceeds the throttling quota. */ public class QuotaViolationExceptionThrottlineQuotaExceededException extends RetryableException { private int throttleTimeMs; public QuotaViolationException(int throttleTimeMs, String message) { super(message); this.throttleTimeMs = throttleTimeMs; } public QuotaViolationException(int throttleTimeMs, String message, Throwable cause) { super(message, cause); this.throttleTimeMs = throttleTimeMs; } public int throttleTimeMs() { return this.throttleTimeMs; } } |
...