DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Discussing
Discussion thread: https://lists.apache.org/thread/vkoqpq10xmvvzknfn65r394d577yogrk
JIRA:
KAFKA-17967
-
Getting issue details...
STATUS
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, when a Kafka producer is configured for idempotence (enable.idempotence=true), the configuration max.in.flight.requests.per.connection should be less than or equal to 5. This limitation exists because the Kafka broker hardcodes the number of distinct batch sequence numbers it retains in memory for deduplication (ProducerStateEntry#NUM_BATCHES_TO_RETAIN).
To guarantee idempotent behavior and message ordering, the number of in-flight requests from the producer must not exceed the broker’s ability to track the history of recent sequence numbers. If a producer attempts to send more concurrent requests than the broker tracks, the broker cannot effectively validate sequence gaps or duplicates in the event of out-of-order delivery, leading to OutOfOrderSequenceException or potential ordering violations.
To fully utilize the available bandwidth, the TCP window must be filled. However, the application-level limitation of 5 in-flight requests prevents the producer from filling the pipeline.
In scenarios like cross-region requests, hybrid cloud architectures, or multi-availability zone with higher internal latency, the network's Bandwidth-Delay Product (BDP) is high. As Round-Trip Time (RTT) increases and throughput drops linearly because In Flight Requests is artificially capped at 5.
The original limit of 5 was chosen to conserve broker heap memory. However, The memory overhead for tracking sequence numbers is minimal. A BatchMetadata object (tracking sequence/offset) is around 36 bytes.
Current State: 5 batches * 36 bytes = 180 bytes per Producer ID (PID).
Proposed (e.g., 20 batches): 20 batches * 36 bytes = ~720 bytes per PID.
Even with 10,000 active producers on a single broker, the overhead increases from ~1.8 MB to ~7.2 MB. The modern Kafka deployments often run on instances with significant memory resources. The memory overhead is negligible for many operators compared to the performance penalty incurred by the current throughput cap.
We propose making the broker-side deduplication buffer size configurable (or dynamic). This will allow operators to tune their clusters for high-latency environments by increasing the allowed max.in.flight.requests.per.connection for idempotent/transactional producers. This change will enable users to maintain exactly-once semantics and strict ordering without accepting a massive degradation in throughput over long-distance links.
Following is throughput difference among 1, 5, 10 max.in.flight.requests.per.connection. The first table uses a server in AWS us-east-1 and a producer in AWS ap-northeast-1.
| max.in.flight.requests.per.connection | throughput | avg latency | max latency | 50th latency | 95th latency | 99th latency | 99.9th latency |
|---|---|---|---|---|---|---|---|
| 1 | 19.623234 records/sec | 25093.64 ms | 49788.00 ms | 25193 ms | 47286 ms | 49341 ms | 49788 ms |
| 5 | 95.129376 records/sec | 5111.89 ms | 10096.00 ms | 5135 ms | 9644 ms | 10095 ms | 10096 ms |
| 10 (temporary change hard code value in producer and broker) | 183.083120 records/sec | 2558.52 ms | 5036.00 ms | 2528 ms | 4749 ms | 5034 ms | 5036 ms |
The second table uses server and producer via localhost. As the result, more in-flight requests still improve throughput in low latency environment.
./bin/kafka-producer-perf-test.sh --topic quickstart-events --num-records 10000000 --throughput 1000000000 --record-size 1000 --transactional-id thisistest --bootstrap-server 127.0.0.1:9092 --command-property max.in.flight.requests.per.connection=x
| max.in.flight.requests.per.connection | throughput | avg latency | max latency | 50th latency | 95th latency | 99th latency | 99.9th latency |
|---|---|---|---|---|---|---|---|
| 1 | 198562.408165 records/sec | 126.01 ms | 464.00 ms | 147 ms | 171 ms | 260 ms | 430 ms |
| 5 | 285087.094107 records/sec | 2.81 ms | 127.00 ms | 0 ms | 14 ms | 68 ms | 108 ms |
| 10 (temporary change hard code value in producer and broker) | 307238.540002 records/sec | 2.14 ms | 76.00 ms | 0 ms | 8 ms | 45 ms | 73 ms |
Public Interfaces
Broker Configuration
Config name:
max.idempotence.batches.to.retainConfig type: Int
Default value: 5
Constraint: at least 5
Introduce a new config on the broker, as the broker must know how much memory to allocate. Operators can set a limitation on the broker side to prevent malicious producers. This configuration only takes effect for idempotent/transactional producers. Prior to this KIP, idempotent/transactional producers are limited to a maximum of 5 for max.in.flight.requests.per.connection. Setting a value greater than 5 on these older producers causes a ConfigException at initialization. To ensure backward compatibility, the new broker configuration max.idempotence.batches.to.retain must have a minimum constraint of 5.
This KIP preserves the existing default of 5 for both max.idempotence.batches.to.retain and max.in.flight.requests.per.connection. While higher defaults could improve throughput, such a change is not the goal of this KIP. We defer this discussion to a future proposal.
ProduceRequest / ProduceResopnse
Add a new field MaxIdempotenceBatchesToRetain to reflect broker configuration max.idempotence.batches.to.retain in ProduceResponse.
{
"apiKey": 0,
"type": "request",
"listeners": ["broker"],
"name": "ProduceRequest",
// ...
- "validVersions": "3-13",
+ // Version 14 is the same as version 13 (KIP-1269).
+ "validVersions": "3-14",
// ...
}
{
"apiKey": 0,
"type": "response",
"name": "ProduceResponse",
// ...
- "validVersions": "3-13",
+ // Versions 14 adds MaxIdempotenceBatchesToRetain as a tagged field (KIP-1269).
+ "validVersions": "3-14",
"flexibleVersions": "9+",
"fields": [
// ...
{ "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "10+", "taggedVersions": "10+", "tag": 0,
"about": "Endpoints for all current-leaders enumerated in PartitionProduceResponses, with errors NOT_LEADER_OR_FOLLOWER.", "fields": [
{ "name": "NodeId", "type": "int32", "versions": "10+",
"mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."},
{ "name": "Host", "type": "string", "versions": "10+",
"about": "The node's hostname." },
{ "name": "Port", "type": "int32", "versions": "10+",
"about": "The node's port." },
{ "name": "Rack", "type": "string", "versions": "10+", "nullableVersions": "10+", "default": "null",
"about": "The rack of the node, or null if it has not been assigned to a rack." }
- ]}
+ ]},
+ { "name": "MaxIdempotenceBatchesToRetain", "type": "int32", "versions": "14+", "taggedVersions": "14+", "tag": 1, "default"
: "5",
+ "about": "The maximum number of idempotence batches the broker retains in memory for a producer to a topic partition."
+ }
]
}
Proposed Changes
Dynamic Capacity Discovery
Initial State (Safety First): When a connection is first established, the produce sends at most min(5, max.in.flight.requests.per.connection) for each node. This ensures safety with any broker version. Old brokers hardcode NUM_BATCHES_TO_RETAIN to 5. To ensure compatibility, the producer initially limits in-flight requests to min(5, max.in.flight.requests.per.connection) until it discovers the broker's actual capability via the ProduceResponse. This prevents sending excessive in-flight requests to older brokers that cannot handle them.
Discovery: Upon receiving the first ProduceResponse from a partition leader, the producer checks for the presence of the MaxIdempotenceBatchesToRetain tagged field.
Adaption:
If the field is present, the Producer updates its local limitation for that specific node connection to
min(max.in.flight.requests.per.connection,.MaxIdempotenceBatchesToRetain)If the field is missing (older broker), the Producer uses limitation as initial state.
Compatibility, Deprecation, and Migration Plan
New broker with old producer
The minimal value of max.idempotence.batches.to.retain is 5. Even if the old producer sets max.in.flight.requests.per.connection up to 5, it doesn’t break the broker configuration.
Old broker with new producer
The default value of MaxIdempotenceBatchesToRetain is 5, so it works as same constraint in old brokers.
Test Plan
Unit test
Validate new configuration.
Validate producer can learn MaxIdempotenceBatchesToRetain from ProduceResponse and align with the broker.
Integration test
Producer with
max.in.flight.requests.per.connection=1and broker withmax.idempotence.batches.to.retain=20: producer sends at most 1 in-flight request.Producer with
max.in.flight.requests.per.connection=10and broker withmax.: producer still sends at most 5 in-flight request.idempotence.batches.to.retain=5
Rejected Alternatives
Hardcoded Increase (e.g., to 20)
Proposal: Just bump the hardcoded
NUM_BATCHES_TO_RETAINto 20 in the broker code.Rejection Reason: While memory usage is generally low, Kafka is used in diverse environments. Forcing a 4x increase in per-producer memory overhead for all users—including those who don't need high BDP throughput—is not acceptable. It must be opt-in/configurable.
Return MaxIdempotenceBatchesToRetain via InitProducerIdResponse
Proposal: Return the limit in the
InitProducerIdResponseinstead ofProduceResponse.Rejection Reason: Different brokers can have different
max.idempotence.batches.to.retain.After getting a PID, the producer can communicate with various partition leader. In this design, the producer doesn't have chance to learn the limitation on different brokers.