Current state: Under Discussion 

Discussion thread: here

JIRA: here

This KIP is a joint work proposed by Omnia Ibrahim and Claude Warren.


The motivations here are similar to KIP-854 Separate configuration for producer ID expiry.  Idempotent producers became the default in Kafka since KIP-679: Producer will enable the strongest delivery guarantee by default as a result of this all producer instances will be assigned PID. The increase of number of PIDs stored in Kafka brokers by ProducerStateManager exposes the broker to OOM errors if it has a high number of producers, rogue or misconfigured client(s). Here are few of use-cases that might cause increase of PIDs: 

  • Clients fail after produce and enter a repeated restart state for a while. 
  • Users initialise a new Producer instance every time they encounter any type of error instead of waiting for the producer instance to auto recover.
  • Users who initialise new Producer per batch. 

As a result of this the broker will hit OOM and become offline. The only way to recover from this is to increase the heap.  

KIP-854 added separated config to expire PID from transaction IDs however the broker is still exposed to OOM if it has high number of PID before  is exceeded. And decreasing the value of  will impact all clients which not desired all the time. It would be more better to target only inefficient users and stopping them from crowding the map of PIDs to their ProducerState  by ProducerStateManager.

This KIP proposes to throttle the number PIDs at the leader of the partition by adding a new rating quota that will be applied during handling the PRODUCE request. This way the broker can reject only misbehaving users by rejecting any unseen PIDs beyond the quota within a given window early on in the process and protect itself without impacting good behaving users. And similar to most Kafka Quotas the client will receive a throttling time and it can resume.

Proposed Changes

We propose adding the new QuotaManager called ProducerIdQuotaManager on the PRODUCE request level in the Kafka API that limits the number of active PIDs per user (KafkaPrincipal). The number of active PIDs will be defined as a rate within a period of time (similar to ControllerMutation quota). Any ProduceRequest with unseen PIDs above this rate will be rejected within given period of time and client will receive a throttleTimeMs to decide when it can retry.

The new quota will be applied based on 

  • /config/users/<user>
  • /config/users/<default>


  • will be applied per KafkaPrincipal as it's a smaller subset which is known to the administrator of the cluster. It will not be applied to ClientId (which is not enforced by the client config) nor a combination of KafkaPrincipal and ClientId.
  • will keep a cache of user (KafkaPrincipal) to unique active PIDs to track active PIDs. The cache is used only to decide if we encountered PID before or not for a given KafkaPrincipal and not as a counter. The cache will be implemented using a simple layered bloom filter controlled by time to avoid any unwanted growth that might cause OOM. (More details on this is explained in the next section) 
  • will check if the caching layer contains the PID for this given user or not.
    • If cache layer doesn't contain the PID then the quota manager will add the PID for this user to the cache and increment quota rating metrics.
    • If the cache layer contains the PID then there is no need to update the cache or increment the quota rating metrics. 
  • will throttle produce requests with given user once it reach the allowed quota of the rating metrics. The quota manager will throw QuotaValidationException  similar to existing quotas in Kafka. And the client will receive ProduceResponse with throttleTimeMs similar to throttling of bandwidth or request. 

An example: 

If User-A  has producer_ids_rate  100 PID per 1hr and the broker can store 2,000,000 before hit out of memory. Then the leader will only store a 100 PIDs in 1 hour and throttle any unseen PID as these will be considered new PIDs.  Producer will receive a response with QuotaViolationException with throttleTimeMs similar to other Kafka quotas.   

Caching layer to track active PIDs per KafkaPrincipal

The cache will be represented as a map of KafkaPrincipal to layered Bloom filter (LayeredBloomFilter from Apache-Commons). The layered Bloom filter will be constructed from a Bloom filters with an associated timestamp.  Layered Bloom filters have 3 methods to manage the layers: builder, cleanup, and extendCheck.  The builder constructs new Bloom filters for the layers as they are needed.  Cleanup removes old layers as they expire, extendCheck determines if a new layer needs to be added.

  • Builder will simply build a new Bloom filter with the proper Shape and an associated timestamp.
  • Cleanup will remove layers that are older than
  • extendCheck will add layers in time increments and will also check the latest filter to see if it is full and add a new layer it is.  The time increment for a new layer is / .

When all of these are configured then a layered Bloom filter is a stable system will have a number of layers specified by , it will maintain a false positive rate established by the Shape, it will automatically remove the expired bloom filters.

User's PIDs in the Bloom filter in the caching layer will go through the filling steps in its lifecycle:

  • Step1: Adding the first PID for user ( let's call this user userA) will create an entry to this user in the cache and will add the PID of to the LayeredBloomFilter.  
    • Now the cache layer is storing the following entry for the user 

      Map { "UserA" -> LayeredBloomFilter {
      					"filters" -> [ TimestampedBloomFilter1 ]
    • Any new PIDs will be added to the first TimestampedBloomFilter until / seconds elapses.
  • Step2: A new timestamped Bloom filter will be created by the LayeredBloomFilter.
    • All new PIDs from this point will be added to the new timestamped Bloom filter (TimestampedBloomFilter2). 
    • Both bloom filters will be used to check if we came across the same PID before or not. 
    • Now the cache layer is storing the following entry for the user

      Map { "UserA" -> LayeredBloomFilter {
      					"filters" -> [ TimestampedBloomFilter1, TimestampedBloomFilter2 ]
  • Step3: The old bloom filter (TimestampedBloomFilter1) will be disposed once we reach .
    • Now the cache layer is storing the following entry for the user 

      Map { "UserA" -> LayeredBloomFilter {
      					"filters" -> [ TimestampedBloomFilter2 ]
  • Step4: Repeat steps 2, and 3 ad infinitum.

Assuming that = 4  and  = 1hr the timeline for the Bloom filters looks like.

This way each user within a window will have 4 bloom filters (all are available for read while only newest of them active for write) and when we need to check if we came across PID before or not for a given user we will check if any of the bloom filters contains the PID. 

Important Note:

User entry in the cached map will be entirely removed from the caching layer if it doesn't have any active bloom filters attached to it anymore. Performing a cleanup for inactive users.

Public Interfaces

New Broker Configurations

We propose to introduce the following new configuration to the Kafka broker: 

  • Window Configs: Similar to other Kafka quotas we need window configurations:
NameTypeDefaultDescription Int11

The number of samples to retain in memory for alter producer id quotas Int3600

The time span of each sample for producer id quotas. Default is 1hr.

  • Cache Configs: Control the quota cache layer 
NameTypeDefaultDescription Int10The frequency in ms that the producer id quota manager will check for disposed cached window.

New Quota Types

We propose the introduce the following new quota types in the Kafka Broker:

producer_ids_rateDoubleLong.MaxValueA representation for upper bound of producer ids accepted for the specified user within an hour.
  • The config will be supported for /config/users/<user>   only as we are trying to avoid the growth of the caching layer and <user> are known number for the operator of the cluster and could be controlled more than the client-id . 
  • producer_ids_rate  will control the size of the Shape per user

  • Extend QuotaConfigs  to handle the new quota type
public class QuotaConfigs {
    public static final String PRODUCER_ID_RATE_OVERRIDE_CONFIG = "producer_ids_rate";
 	public static final String PRODUCER_ID_RATE_DOC = "A rate representing the upper bound of active producer ids."
 	public static ConfigDef buildProducerIdsConfig(ConfigDef configDef ) {
		configDef.define(PRODUCER_ID_RATE_OVERRIDE_CONFIG, ConfigDef.Type.DOUBLE, Integer.MAX_VALUE,
            Integer.valueOf(Integer.MAX_VALUE).doubleValue(), ConfigDef.Importance.MEDIUM, PRODUCER_ID_RATE_DOC);
        return configDef;
  • Extends `DynamicConfig`  and `ClientQuotaControlManager.configKeysForEntityType` to handle the new quota.

New Broker Metrics

The new metrics will be exposed by the broker:

ProducerIdsrateuserThe current rate
ProducerIdstokensuserThe remaining tokens in the bucket < 0 indicates that throttling is applied. 
ProducerIdsthrottle-timeuserTracking average throttle-time per user. 

Client Errors

The new quota type will use QuotaViolationException similar to ClientQuotaManager. And the client will receive ProduceResponse with throttleTimeMs similar to throttling of bandwidth or request.

New ProducerIdQuotaManagerCache

public class ProducerIDQuotaManagerCache { 
     * Constructor.  This manager tracks the PIDs for each principal.
     * PIDs will expire from the Manager after the number of milliseconds specified in the {@code windowLengthInSeconds}
     * parameter.  A new window is started every windowLengthInSeconds/layerCount seconds.
     * @param falsePositiveRate The acceptable false positive rate.
     * @param windowLengthInSeconds The number of seconds each window should last.
     * @param layerCount  The number of layers that should be active at any one time.
    public ProducerIDQuotaManagerCache(Double falsePositiveRate, long windowLengthInSeconds, int layerCount);

     * Ensures that the PID is tracked as being seen in the last logical window.
     * @param principal the principal to track.
     * @param producerIdRate the rate per hour at which PIDs are expected to be generated.
     * @param pid the PID to track.
     * @return true if it was found in the manager.
    public boolean track(KafkaPrincipal principal, int producerIdRate, long pid)



class TimestampedBloomFilter extends WrappedBloomFilter {
        long expires;
        TimestampedBloomFilter(Shape shape, long ttl) {
            super(new SimpleBloomFilter(shape));
            expires = System.currentTimeMillis() + ttl;

Tools  will be extended to support the new quota.  A new quota property will be added, which can be applied to <user>:

  • producer_ids_rate: The number of active PIDs per quota window.

For example:

bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'producer_ids_rate=50' --entity-name user1 --entity-type users

Default quotas for <user> can be configured by omitting entity name. For example:

bin/kafka-configs --bootstrap-server localhost:9092 --alter --add-config 'producer_ids_rate=200' --entity-type users

Known Limitations

  • As we are using BloomFilter we might get false positives. This can be controlled by
  • Throttling based on User will punish any client is used by the same user. However, this is similar risk like existing quotas that got applied only on KafkaPrincipal level.
  • Similar to other Quotas in Kafka, all throttling is against individual brokers. Which means if leadership changed the new leader will start throttle from zero if it never had this KafkaPrincipal producing to it before. 
  • Some producers might get throttled for long time depends on the configuration instead of crashing. Which may go unnoticed for some producers specially if they don't alert on the throttle or other metrics to get notified when the producer stopped producing. 

Compatibility, Deprecation, and Migration Plan

Compatibility with Old Clients

  • None, since we are using the same throttling from ClientQuota which the client knows how to handle.

Misconfigured producers that cause this problem in the first place might not notice that they are being throttled specially if they don't monitor the throttling metrics. This is a known limitation for the KIP  at the moment.

Rejected Alternatives

1. Limit the total active producer ID allocation number

This solution is the simplest however as stated in the motivation the OOM is always caused by rough or misconfigured client this solution will punish good client along side the rough one. 

2. Having a limit to the number of active producer IDs

The idea here is if we had a misconfigured client, we will expire the older entries This solution will risk the idempotency guarantees. Also there is risk that we may end up expiring the PIDs for good clients as the there is no way to link back PID to specific client at this point. 

3. Allow clients to "close" the producer ID usage

Part of the root cause of the OOM problem is that we keep PIDs metadata in the broker even if the producer is "closed". This solution would provide a closed API (for example END_PRODUCER_ID) and the broker will remove the PID metadata from its side. In the client side, we can send it when the producer closing. This solution is better however

  • it only improves the situation with new clients leaving the broker exposed to OOM because of old producers.
  • It doesn't address producers that enter repeated restart cycle as these producer will be crashing and will not call producer.close  method.

We may need to consider improving the Producer Client anyway to include this at some point but it is not as part of the scope of this KIP.

4. Throttle INIT_PRODUCER_ID requests

This solution might look simple however throttling the INIT_PRODUCER_ID doesn't guarantee the OOM wouldn't happened as

    1. INIT_PRODUCER_ID for idempotent producer request PIDs goes to the least loaded broker which change over time to handle so if INIT_PRODUCER_ID got throttled on one broker doesn't guarantee it will not go through on next least loaded broker causing OOM at the leader later causing OOM at the leader later. We can in theory solve this by implementing a way to shard and direct request between brokers using KafkaPrincipal hash instead. However, this wouldn't eliminate the OOM on leader when we lose one broker and leadership with PIDs move to the next broker. The example blew explain this point in some details. 
    1. The problem happened on the activation of the PID when it produce and not at the initialisation. Which means Kafka wouldn't have OOM problem if the producer got assigned PID but crashed before producing anything. So similar to the point above moving the leadership with PIDs might lead to OOM on the next broker.
    2. Throttling producers that crash between initialisation and producing could slow them down when they recover/fix the problem that caused them to crash right after initialising PID. 
    3. Old Producers won't know how to fail with throttling error instead this implementation will need to pen back on existing fatal error that crash the producer on initialisation like authorisation/authentication error and maybe introduce a new error for the new version of producers. Producers also don't have mechanism to retry errors at initialisation which will be a limitation for this solution specially for well behaved producers. 

Example for why this solution won't prevent OOM: 

Let's imaging a cluster with 6 brokers where each broker can cache 200 PID state before hitting OOM. Each KafkaPrincipal can send 10 x INIT_PRODUCER_ID and we have like 20 active KafkaPrincipal. Which in total we can have 200 INIT_PID request. Now on the leadership side of producing we have 

  • Broker-1 has 100 PID in memory for all partitions it is leading

  • Broker-2 has 150 PID in memory for all partitions it is leading

  • Broker-3 has 100 PID in memory for all partitions it is leading

  • Broker-4 has 120 PID in memory for all partitions it is leading

  • Broker-5 has 100 PID in memory for all partitions it is leading

  • Broker-6 has 90  PID in memory for all partitions it is leading 

Now Broker-1 is down and for the sake of this example not all 100 PID will connect to one broker; instead 60 PID will connect to Broker-2 bringing its total PID to 210. Now these PIDs aren’t going to be throttle by INIT_PRODUCER_ID as they already have been initialised and will be loaded into the cache on Broker-2 hitting OOM! Now Broker-2 is down. If Broker-1 and 2 down now the problem is propagated and the cluster will be down as each broker try to take leadership will load these PIDs into cache and hit OOM until someone manually increase the memory of the brokers or if we hit the PID cleanup at some point of all of this mess. 

5. Throttle PIDs based on IPs

Similar solution#1 we will end up punishing good users specially if the misbehaving producer is deployed on K8S cluster that has other usecase.

6. Use HashSet to track PIDs in the caching layer instead of BloomFilter

HashSet provide 100% correctness however the growth of the caching layer with HashSet will create a risk of OOM. While it is not as bad as the original OOM as the broker wouldn't rebuild this cache on the start time none the less. To control the memory of cache using HashSet will be bit tricky and will need more configuration to keep it under control.
On the other hand BloomFilter is more efficient when it come to memory cost while providing a reasonable correctness that will be good enough for this usecase.  And if we want to improve the correctness we can always improve the false positive rates in the bloom filter. 

7. Use 2 alternating standard Bloom filters to track the PIDs.

The rather than building a Bloom filter management structure we should rely on the library from Apache commons-collections.  As it seems to handle all the demands we have for Bloom filter management.

8. Use 2 layered bloom filter for the caching layer for the same proposed solution

When adding a PID to the layered Bloom filter we check to see if the PID is already in the filter.  If so we don't add it and do not report it as new.  When we reach the expiration time of the oldest layer it is removed, which means that ALL the PIDs that are in that filter are no longer recognized even if we just saw it.  So long running PIDs will be listed as new after the window expires.  The solution to this is to ensure that the PID is listed in a filter after the oldest one whenever there are more than 1 layers in the layered Bloom filter.  So effectively all PIDs are always added to the filter in this case.  However with a multi layer solution, the size of the Bloom filters can be smaller and we get better age resolution.  With 3 filters we have to check if the PID exists in any of the later filters.  The LayeredBloomFilter has a method that will return all the layers the PID is in.  So we can add it to layer 3 if it only exists in layer 1, and not report his as a new PID.  

9. Allow users to low level config for the bloom filter in the caching layer

originally the KIP proposed `` and `` to control the cache layer but these need Kafka admin to understand the bloom implementation which is a bit of assumpted knowledge. To simplify this proposal these configs will be dropped for the time being


  • No labels