Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


With this configuration, if a client sends a request with 7 topics with 80 partitions each at time T, worth a total of 560 mutations, it bring the average rate R to 5.6 (560 / 100). As the rate is above the defined quota, any subsequent mutations is rejected until the average rate gets back to the allowed quota. Currently, this is computed as follow for our current quotas: ((R - Q / Q * S * W)) = ((5.6 - 5) / 5 * 100 * 1) = 12 secs. In practice, the average rate won't go back until the samples at time T is discarded so any new mutations won't be accepted before S * W. 100 secs in our case.

To overcome this, we initially thought about using propose to use a variation of the Tokens Bucket Algorithm. The algorithm guarantee an average rate by accumulating tokens at the defined rate and allowing to spend the accumulated tokens to do an operations. The burst is defined by the algorithm. Let's define:

  • K: The number of tokens in the bucket
  • B: The maximum number of tokens


  • that can be accumulated


We propose the update the implementation of our Rate metrics (when used in conjunction with a quota) to allocate the burst to all the previous window up to Q and to allocate the remaining part the the current window. The rational behind this is to allow the average rate to decrease regularly similarly to what the token bucket does. While not perfect, it allows to better accommodate bursty workloads. Let's look at our previous example with this change in mind. Instead of having one sample worth of 560, we will have 99 samples worth of 5 and one last sample worth of 65. The average rate is still 5.6 as the sum does not change. By waiting 12 secs (or discarding 12 samples) brings back the average rate to the define quota Q.

The rate is accumulated by the number of partitions N created or deleted.

Altogether, a mutation with N partitions is admitted iff R <= Q. The partition mutation request is rejected otherwise. The throttle time is defined with: (R - Q) / Q * S * W seconds.

Controller Mutation Quota Manager


  • in the bucket (burst)
  • R: The rate at which we accumulate tokens
  • T: The last time K got updated

At the time now, we update the number of tokens in the bucket with:

  • K = min(K + (now - T) * R), B)

A partition mutation with N partitions is admitted iff K >= 0, and updates K = K - N. The partition mutation request is rejected otherwise. The variation here is that we allow the number of tokens to become negative to accommodate any number of partitions for a topic. The throttle time is defined with: -K * R.

In order to be able to reuse our existing configuration, the Tokens Bucket is configured based on Q, S and W as follow:

  • R = Q
  • B = Q * S * W

Our previous example would work as follow: R = 5, B = 500. The burst of 560 mutations brings the number of tokens to -60. The throttle time is -(-60) / 5 = 12s.

The Tokens Bucket will be implemented as a new `MeasurableStat` that will be used within a `Sensor` alongside the existing `Rate`. The quota will be enforced on the Token Bucket metric only. The `Sensor` will be updated the handle the verification of any Tokens Bucket metrics.

Controller Mutation Quota Manager

We propose to introduce a new quota manager called `ControllerMutationQuotaManager` which implements the algorithm described above. The new quota manager will be configured by the new quota types: `controller_mutation_rate`.


* We keep the name intentionally generic to allow us to extend their coverage in the future.

New Broker Metrics

We propose to expose the following new metric in the Kafka Broker:

The current rate.
GroupNameTagsDescriptionControllerMutationQuotaManagerrate(user, client-id) - with the same rules used by existing quota metrics
ControllerMutationrate(user, client-id) - with the same rules used by existing quota metricsThe current rate.
ControllerMutationtokens(user, client-id) - with the same rules used by existing quota metricsThe remaining tokens in the bucket. < 0 indicates that throttling is applied. 
ControllerMutationthrottle-time(user, client-id) - with the same rules used by existing quota metricsTracking average throttle-time per user/client-id.

New TokenBucket Metric

As mentioned, we propose to introduce a new metric named `TokemBucket` which implements the tokens bucket algorithm.

Code Block
 * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token bucket algorithm
 * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}.
public class TokenBucket implements MeasurableStat {
    public TokenBucket() {

    public TokenBucket(TimeUnit unit) {
        // Implementation omitted

    public double measure(final MetricConfig config, final long timeMs) {
        // Implementation omitted

    public void record(final MetricConfig config, final double value, final long timeMs) {
        // Implementation omitted

Admin API

As mentioned, we propose to introduce a new retryable `ThrottlingQuotaExceededException` exception which will be given back to the caller when a topic is rejected due to throttling. The new exception maps to the new `THROTTLING_QUOTA_EXCEEDED` error.


  • By default, the upgrade should be transparent since the Admin Client will automatically retry QuotaViolationException and return it to the caller only if the retry timeout is reached. In this case, the caller must at minimum handle the RetryableException and retry. Handling retryable Exceptions is something that we can safely expect from clients.

Rejected Alternatives


Update our existing Rate to behave like the Token Bucket


We have tried to modify our existing Rate to behave like the Token Bucket. That works but would not provided the same observability guarantees anymore. The Rate would behave like it does today and that is counter intuitive from an operations perspective. Therefore, we have preferred using a new metric along side the Rate to enforce the quota.

Usage based Quota

We have considered using a usage based quota similarly to the request quota. The usage would mainly be measured as the CPU time taken by the operations. While this would the benefits to cover all the operations in the controller, we have decided against it for the following reasons: