...
* We keep the name intentionally generic to allow us to extend their coverage in the future.
The new name works similarly to the already existing quota. It can be set or changed by using the `kafka-configs.sh` tool.
For instance, the above command defines a quota of 10 controller mutations per secs for the (user=user1, client-id=clientA):
Code Block | ||
---|---|---|
| ||
> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'controller_mutations_rate=10' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'. |
The above commands list the quota of (user=user1, client-id=clientA):
Code Block | ||
---|---|---|
| ||
> bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are controller_mutations_rate=10,... (other quotas are listed as well) |
Broker Metrics
We propose to expose the following new metric in the Kafka Broker:
...
Broker Metrics
We propose to expose the following new metric in the Kafka Broker:
Group | Name | Tags | Description |
---|---|---|---|
ControllerMutationsQuotaManager | rate | (user, client-id) - with the same rules used by existing quota metrics | The current rate. |
Admin API
As mentioned, we propose to introduce a new retryable `QuotaViolatedException` exception which will be given back to the caller when a topic is rejected due to throttling. The new exception maps to the new `THROTTLING_QUOTA_EXCEEDED` error.
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Exception thrown if an operation on a resource exceeds the throttling quota.
*/
public class ThrottlineQuotaExceededException extends RetryableException {
private int throttleTimeMs;
public QuotaViolationException(int throttleTimeMs, String message) {
super(message);
this.throttleTimeMs = throttleTimeMs;
}
public QuotaViolationException(int throttleTimeMs, String message, Throwable cause) {
super(message, cause);
this.throttleTimeMs = throttleTimeMs;
}
public int throttleTimeMs() {
return this.throttleTimeMs;
}
} |
The `CreateTopicsOptions`, `CreatePartitionsOptions`, and `DeleteTopicsOptions` will be extended to include a flag indicating if `QuotaViolatedException` should be automatically retried by the AdminClient or not
Admin API
As mentioned, we propose to introduce a new retryable `QuotaViolatedException` exception which will be given back to the caller when a topic is rejected due to throttling. The new exception maps to the new `THROTTLING_QUOTA_EXCEEDED` error.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * ExceptionOptions thrownfor if an operation on a resource exceeds the throttling quota{@link Admin#createTopics(Collection)}. * * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class ThrottlineQuotaExceededExceptionCreateTopicsOptions extends RetryableExceptionAbstractOptions<CreateTopicsOptions> { private int throttleTimeMs; boolean retryQuotaViolatedException = true; /** public QuotaViolationException(int throttleTimeMs, String message) {* Set the retry QuotaViolatedException to indicate wether QuotaViolatedException * should super(message); this.throttleTimeMs = throttleTimeMs; } be automatically retried or not. */ public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {} /** public QuotaViolationException(int throttleTimeMs, String message, Throwable cause) { super(message, cause); this.throttleTimeMs = throttleTimeMs; } public int throttleTimeMs() { return this.throttleTimeMs; } } |
The `CreateTopicsOptions`, `CreatePartitionsOptions`, and `DeleteTopicsOptions` will be extended to include a flag indicating if `QuotaViolatedException` should be automatically retried by the AdminClient or not.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Options for {@link Admin#createTopics(Collection)}. * * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class CreateTopicsOptions extends AbstractOptions<CreateTopicsOptions> * Returns true if the QuotaViolatedException should be automatically retried * by the AdminClient. */ public boolean retryQuotaViolatedException(){} } /** * Options for {@link Admin#createPartitions(Map)}. * * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class CreatePartitionsOptions extends AbstractOptions<CreatePartitionsOptions> { private boolean retryQuotaViolatedException = true; /** * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException * should be automatically retried or not. */ public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {} /** * Returns true if the QuotaViolatedException should be automatically retried * by the AdminClient. */ public boolean retryQuotaViolatedException(){} } /** * Options for {@link Admin#createPartitionsAdmin#deleteTopics(MapCollection)}. * * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class CreatePartitionsOptionsDeleteTopicsOptions extends AbstractOptions<CreatePartitionsOptions>AbstractOptions<DeleteTopicsOptions> { private boolean retryQuotaViolatedException = true; /** * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException * should be automatically retried or not. */ public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {} /** * Returns true if the QuotaViolatedException should be automatically retried * by the AdminClient. */ public boolean retryQuotaViolatedException(){} } /** * Options for {@link Admin#deleteTopics(Collection)}. * * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving public class DeleteTopicsOptions extends AbstractOptions<DeleteTopicsOptions> { private boolean retryQuotaViolatedException = true; /** * Set the retry QuotaViolatedException to indicate wether QuotaViolatedException * should be automatically retried or not. */ public CreateTopicsOptions retryQuotaViolatedException(boolean retry) {} /** * Returns true if the QuotaViolatedException should be automatically retried * by the AdminClient. */ public boolean retryQuotaViolatedException(){} } |
Kafka Topic Command
...
Kafka Topic Command
We propose to disable the automatic try of the QuotaViolatedException for the `kafka-topics.sh` command in order to not have the command blocked until the retry period is exhausted.
Kafka Config Command
The new name works similarly to the already existing quota. It can be set or changed by using the `kafka-configs.sh` tool. When the new quota API is used, an old broker that does not support the new name will reject it with a INVALID_REQUEST error.
For instance, the above command defines a quota of 10 controller mutations per secs for the (user=user1, client-id=clientA):
Code Block | ||
---|---|---|
| ||
> bin/kafka-configs.sh --bootstrap-server ... --alter --add-config 'controller_mutations_rate=10' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'. |
The above commands list the quota of (user=user1, client-id=clientA):
Code Block | ||
---|---|---|
| ||
> bin/kafka-configs.sh --bootstrap-server ... --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are controller_mutations_rate=10,... (other quotas are listed as well) |
Known Limitations
Auto Topic Creation
...