DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Voting in progress
Discussion thread: https://lists.apache.org/thread/vkoqpq10xmvvzknfn65r394d577yogrk
Vote thread: https://lists.apache.org/thread/dko0pykc8wdbwp0mzs2h2jj2zpg9vpty
JIRA:
KAFKA-17967
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, when a Kafka producer is configured for idempotence (enable.idempotence=true), the configuration max.in.flight.requests.per.connection should be less than or equal to 5. This limitation exists because the Kafka broker hardcodes the number of distinct batch sequence numbers it retains in memory for deduplication (ProducerStateEntry#NUM_BATCHES_TO_RETAIN).
To guarantee idempotent behavior and message ordering, the number of in-flight requests from the producer must not exceed the broker’s ability to track the history of recent sequence numbers. If a producer attempts to send more concurrent requests than the broker tracks, the broker cannot effectively validate sequence gaps or duplicates in the event of out-of-order delivery, leading to OutOfOrderSequenceException or potential ordering violations.
To fully utilize the available bandwidth, the TCP window must be filled. However, the application-level limitation of 5 in-flight requests prevents the producer from filling the pipeline.
In scenarios like cross-region requests, hybrid cloud architectures, or multi-availability zone with higher internal latency, the network's Bandwidth-Delay Product (BDP) is high. As Round-Trip Time (RTT) increases and throughput drops linearly because In Flight Requests is artificially capped at 5.
The original limit of 5 was chosen to conserve broker heap memory. However, The memory overhead for tracking sequence numbers is minimal. A BatchMetadata object (tracking sequence/offset) is around 36 bytes.
Current State: 5 batches * 36 bytes = 180 bytes per Producer ID (PID).
Proposed (e.g., 20 batches): 20 batches * 36 bytes = ~720 bytes per PID.
Even with 10,000 active producers on a single broker, the overhead increases from ~1.8 MB to ~7.2 MB. The modern Kafka deployments often run on instances with significant memory resources. The memory overhead is negligible for many operators compared to the performance penalty incurred by the current throughput cap.
We propose making the broker-side deduplication buffer size configurable (or dynamic). This will allow operators to tune their clusters for high-latency environments by increasing the allowed max.in.flight.requests.per.connection for idempotent/transactional producers. This change will enable users to maintain exactly-once semantics and strict ordering without accepting a massive degradation in throughput over long-distance links.
Following is throughput difference among 1, 5, 10 max.in.flight.requests.per.connection. The first table uses a server in AWS us-east-1 and a producer in AWS ap-northeast-1.
| max.in.flight.requests.per.connection | throughput | avg latency | max latency | 50th latency | 95th latency | 99th latency | 99.9th latency |
|---|---|---|---|---|---|---|---|
| 1 | 19.623234 records/sec | 25093.64 ms | 49788.00 ms | 25193 ms | 47286 ms | 49341 ms | 49788 ms |
| 5 | 95.129376 records/sec | 5111.89 ms | 10096.00 ms | 5135 ms | 9644 ms | 10095 ms | 10096 ms |
| 10 (temporary change hard code value in producer and broker) | 183.083120 records/sec | 2558.52 ms | 5036.00 ms | 2528 ms | 4749 ms | 5034 ms | 5036 ms |
The second table uses server and producer via localhost. As the result, more in-flight requests still improve throughput in low latency environment.
./bin/kafka-producer-perf-test.sh --topic quickstart-events --num-records 10000000 --throughput 1000000000 --record-size 1000 --transactional-id thisistest --bootstrap-server 127.0.0.1:9092 --command-property max.in.flight.requests.per.connection=x
| max.in.flight.requests.per.connection | throughput | avg latency | max latency | 50th latency | 95th latency | 99th latency | 99.9th latency |
|---|---|---|---|---|---|---|---|
| 1 | 198562.408165 records/sec | 126.01 ms | 464.00 ms | 147 ms | 171 ms | 260 ms | 430 ms |
| 5 | 285087.094107 records/sec | 2.81 ms | 127.00 ms | 0 ms | 14 ms | 68 ms | 108 ms |
| 10 (temporary change hard code value in producer and broker) | 307238.540002 records/sec | 2.14 ms | 76.00 ms | 0 ms | 8 ms | 45 ms | 73 ms |
Public Interfaces
Topic Configuration
- Config name: producer.state.batches.to.retain
- Server default config name: log.producer.state.batches.to.retain
- Config type: Int
- Default value: 5
- Constraint: at least 5
Introduce a new topic configuration producer.state.batches.to.retain, with a server default config log.producer.state.batches.to.retain. This configuration controls how many produce request metadata entries the broker retains per partition to support idempotent and transactional deduplication. Operators can set the server default on the broker to apply a cluster-wide baseline, or override it per topic to tune individual topics independently. This configuration only takes effect for idempotent/transactional producers. To ensure backward compatibility, producer.state.batches.to.retain must have a minimum constraint of 5. This preserves the behavior of existing producers, which assume the broker always retains at least 5 batch metadata entries.
This KIP preserves the existing default of 5 for both producer.state.batches.to.retain and max.in.flight.requests.per.connection. While higher defaults could improve throughput, such a change is not the goal of this KIP. We defer this discussion to a future proposal.
Producer Configuration
max.in.flight.requests.per.connection
Prior to this KIP, the Kafka producer client enforces a hard limit of 5 on max.in.flight.requests.per.connection when idempotence or transactions are enabled, because the broker hardcodes its deduplication window to 5 entries. Setting a higher value causes a ConfigException at producer initialization. This KIP removes that restriction. With the broker's deduplication window now configurable and discoverable per partition via ProduceResponse, the producer no longer needs a static upper bound enforced at startup. The max.in.flight.requests.per.connection reverts to its original role: a pure connection-level capacity on the total number of in-flight requests to a broker node, regardless of whether the producer is idempotent or transactional. The per-partition in-flight check described below takes over responsibility for ensuring the producer does not exceed the broker's deduplication window for any individual partition.
ProduceRequest / ProduceResopnse
Add a new field ProducerStateBatchesToRetain to reflect topic partition configuration producer.state.batches.to.retain in ProduceResponse.
{
"apiKey": 0,
"type": "request",
"listeners": ["broker"],
"name": "ProduceRequest",
// ...
- "validVersions": "3-13",
+ // Version 14 is the same as version 13 (KIP-1269).
+ "validVersions": "3-14",
// ...
}
{
"apiKey": 0,
"type": "response",
"name": "ProduceResponse",
// ...
- "validVersions": "3-13",
+ // Versions 14 adds ProducerStateBatchesToRetain as a tagged field (KIP-1269).
+ "validVersions": "3-14",
"flexibleVersions": "9+",
"fields": [
{ "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",
"about": "Each produce response.", "fields": [
{ "name": "Name", "type": "string", "versions": "0-12", "entityType": "topicName", "mapKey": true, "ignorable": true,
"about": "The topic name." },
{ "name": "TopicId", "type": "uuid", "versions": "13+", "mapKey": true, "ignorable": true, "about": "The unique topic ID" },
// ...
- ]}
+ ]},
+ { "name": "ProducerStateBatchesToRetain", "type": "int32",
+ "versions": "14+", "taggedVersions": "14+", "tag": 1, "default": "5",
+ "about": "The maximum number of idempotent batches to retain in the broker." }
]},
// ...
]
}
Proposed Changes
Two-Level In-Flight Check
To support per-topic deduplication window sizes, the producer enforces two independent in-flight checks before sending a batch. The first is a per-partition check: for each topic partition, the producer tracks the number of batches currently in-flight to that partition and ensures it does not exceed the partition's discovered ProducerStateBatchesToRetain limit. This enforces the broker's deduplication window on a per-partition basis, so that a topic configured with a larger window does not interfere with a topic configured with a smaller one. The second is the existing per-connection check: the total number of in-flight requests to a broker node must not exceed max.in.flight.requests.per.connection. Both checks must pass before a batch is sent. The per-partition check is the new constraint introduced by this KIP; the per-connection check is unchanged from existing behavior. The per-partition limit is initially 5 and is updated dynamically as the producer receives ProduceResponse messages, as described in the Dynamic Capacity Discovery section below.
Dynamic Capacity Discovery
Initial State (Safety First): When a connection is first established, the per-partition in-flight limit defaults to 5 for all partitions on that node. This ensures safe behavior with any broker version, since old brokers hardcode their deduplication window to 5 entries. The per-connection limit remains max.in.flight.requests.per.connection as configured by the user.
Discovery: Upon receiving a ProduceResponse from a partition leader, the producer reads the ProducerStateBatchesToRetain tagged field from each PartitionProduceResponse. This value reflects the producer.state.batches.to.retain configuration of that specific topic partition on the leader broker.
Adaptation:
- If the field is present, the producer updates the per-partition in-flight limit for that partition to the received ProducerStateBatchesToRetain value.
- If the field is absent, the producer retains the default per-partition limit of 5 for all partitions on that node.
Leader Change: When the producer detects a partition leader change, the producer resets the per-partition in-flight limit for that partition back to 5 and re-enters the discovery process with the new leader. This is necessary because the new leader may be an old broker that does not recognize producer.state.batches.to.retain and hardcodes its deduplication window to 5. Resetting to 5 on every leader change ensures correctness regardless of whether the new leader is an upgraded or non-upgraded broker, and the producer will rediscover the correct limit from the new leader's first ProduceResponse.
Compatibility, Deprecation, and Migration Plan
New topic configuration with old broker
The producer.state.batches.to.retain has no effect on old brokers. Old brokers do not recognize this topic configuration and will continue to use their hardcoded deduplication window of 5. Even if an operator sets producer.state.batches.to.retain to a value greater than 5 on a topic, the old broker ignores it and the ProducerStateBatchesToRetain field will remain default value in PartitionProduceResponse. The new producer will therefore fall back to the default per-partition in-flight limit of 5 for all partitions on that node, as described in the Dynamic Capacity Discovery section. Operators should ensure all brokers in the cluster are upgraded before relying on values greater than 5.
New broker with old producer
The minimum value of producer.state.batches.to.retain is 5, which matches the deduplication window old producers assume. Old producers are still subject to the existing ConfigException at initialization if max.in.flight.requests.per.connection is set greater than 5 with idempotence or transactions enabled. Their behavior on the broker side is unchanged.
Old broker with new producer
When a new producer connects to an old broker, the old broker doesn't set the ProducerStateBatchesToRetain in ProduceResponse. The default value is 5, so the new producer retains the default per-partition in-flight limit of 5 for all partitions on that node, which matches the old broker's hardcoded deduplication window. The removal of the startup ConfigException for idempotent producers still takes effect on the producer side, but since the discovered per-partition limit defaults to 5, the producer will not send more than 5 in-flight batches per partition to an old broker, preserving correctness.
Test Plan
Unit test
Validate new configuration - default/minimal value.
Validate producer can learn ProducerStateBatchesToRetain from ProduceResponse and align with the partition leader.
- Validate that the producer resets the per-partition in-flight limit to 5 upon detecting a partition leader change, and rediscovers the limit from the new leader's first ProduceResponse.
- Validate that idempotent and transactional producers no longer throw ConfigException when max.in.flight.requests.per.connection is set greater than 5.
Integration test
- Producer with max.in.flight.requests.per.connection=1 and topic with producer.state.batches.to.retain=20: producer sends at most 1 in-flight request per partition, bounded by the connection-level check.
- Producer with max.in.flight.requests.per.connection=20 and topic with producer.state.batches.to.retain=5: producer sends at most 5 in-flight batches per partition, bounded by the per-partition check.
- Producer with max.in.flight.requests.per.connection=20 and two topics on the same broker, one with producer.state.batches.to.retain=5 and another with producer.state.batches.to.retain=20: producer respects each partition's limit independently without one topic affecting the other.
- Producer connected to an old broker with producer.state.batches.to.retain=20 set on the topic: producer falls back to a per-partition limit of 5 since the old broker does not return ProducerStateBatchesToRetain in PartitionProduceResponse.
- Upon partition leader change from a new broker to an old broker: producer resets the per-partition limit to 5 and does not exceed the old broker's hardcoded deduplication window.
Rejected Alternatives
Hardcoded Increase (e.g., to 20)
Proposal: Just bump the hardcoded
NUM_BATCHES_TO_RETAINto 20 in the broker code.Rejection Reason: While memory usage is generally low, Kafka is used in diverse environments. Forcing a 4x increase in per-producer memory overhead for all users—including those who don't need high BDP throughput—is not acceptable. It must be opt-in/configurable.
Return ProducerStateBatchesToRetain via InitProducerIdResponse
Proposal: Return the limit in the
InitProducerIdResponseinstead ofProduceResponse.Rejection Reason: Different brokers can have different
producer.state.batches.to.retain.After getting a PID, the producer can communicate with various partition leader. In this design, the producer doesn't have chance to learn the limitation on different brokers.