Status

Current stateDiscussing

Discussion thread: https://lists.apache.org/thread/vkoqpq10xmvvzknfn65r394d577yogrk

JIRA: KAFKA-17967 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, when a Kafka producer is configured for idempotence (enable.idempotence=true), the configuration max.in.flight.requests.per.connection should be less than or equal to 5. This limitation exists because the Kafka broker hardcodes the number of distinct batch sequence numbers it retains in memory for deduplication (ProducerStateEntry#NUM_BATCHES_TO_RETAIN).

To guarantee idempotent behavior and message ordering, the number of in-flight requests from the producer must not exceed the broker’s ability to track the history of recent sequence numbers. If a producer attempts to send more concurrent requests than the broker tracks, the broker cannot effectively validate sequence gaps or duplicates in the event of out-of-order delivery, leading to OutOfOrderSequenceException or potential ordering violations.

To fully utilize the available bandwidth, the TCP window must be filled. However, the application-level limitation of 5 in-flight requests prevents the producer from filling the pipeline.

$$\text{Throughput} \approx \frac{\text{Request Size} \times \text{In Flight Requests}}{\text{Network RTT}}$$

In scenarios like cross-region requests, hybrid cloud architectures, or multi-availability zone with higher internal latency, the network's Bandwidth-Delay Product (BDP) is high. As Round-Trip Time (RTT) increases and throughput drops linearly because In Flight Requests is artificially capped at 5.

The original limit of 5 was chosen to conserve broker heap memory. However, The memory overhead for tracking sequence numbers is minimal. A BatchMetadata object (tracking sequence/offset) is around 36 bytes.

  • Current State: 5 batches * 36 bytes = 180 bytes per Producer ID (PID).

  • Proposed (e.g., 20 batches): 20 batches * 36 bytes = ~720 bytes per PID.

Even with 10,000 active producers on a single broker, the overhead increases from ~1.8 MB to ~7.2 MB. The modern Kafka deployments often run on instances with significant memory resources. The memory overhead is negligible for many operators compared to the performance penalty incurred by the current throughput cap.

We propose making the broker-side deduplication buffer size configurable (or dynamic). This will allow operators to tune their clusters for high-latency environments by increasing the allowed max.in.flight.requests.per.connection for idempotent/transactional producers. This change will enable users to maintain exactly-once semantics and strict ordering without accepting a massive degradation in throughput over long-distance links.

Following is throughput difference among 1, 5, 10 max.in.flight.requests.per.connection. The first table uses a server in AWS us-east-1 and a producer in AWS ap-northeast-1.

max.in.flight.requests.per.connectionthroughputavg latencymax latency50th latency95th latency99th latency99.9th latency
119.623234 records/sec25093.64 ms49788.00 ms25193 ms47286 ms49341 ms49788 ms
595.129376 records/sec5111.89 ms10096.00 ms5135 ms9644 ms10095 ms10096 ms
10 (temporary change hard code value in producer and broker)183.083120 records/sec2558.52 ms5036.00 ms2528 ms4749 ms5034 ms5036 ms

The second table uses server and producer via localhost. As the result, more in-flight requests still improve throughput in low latency environment.

./bin/kafka-producer-perf-test.sh --topic quickstart-events --num-records 10000000 --throughput 1000000000 --record-size 1000 --transactional-id thisistest --bootstrap-server 127.0.0.1:9092 --command-property max.in.flight.requests.per.connection=x

max.in.flight.requests.per.connectionthroughputavg latencymax latency50th latency95th latency99th latency99.9th latency
1198562.408165 records/sec126.01 ms464.00 ms147 ms171 ms260 ms430 ms
5285087.094107 records/sec2.81 ms127.00 ms0 ms14 ms68 ms108 ms
10 (temporary change hard code value in producer and broker)307238.540002 records/sec2.14 ms76.00 ms0 ms8 ms45 ms73 ms

Public Interfaces

Broker Configuration

  • Config name: max.idempotence.batches.to.retain

  • Config type: Int

  • Default value: 5

  • Constraint: at least 5

Introduce a new config on the broker, as the broker must know how much memory to allocate. Operators can set a limitation on the broker side to prevent malicious producers. This configuration only takes effect for idempotent/transactional producers. Prior to this KIP, idempotent/transactional producers are limited to a maximum of 5 for max.in.flight.requests.per.connection. Setting a value greater than 5 on these older producers causes a ConfigException at initialization. To ensure backward compatibility, the new broker configuration max.idempotence.batches.to.retain must have a minimum constraint of 5. 

This KIP preserves the existing default of 5 for both max.idempotence.batches.to.retain and max.in.flight.requests.per.connection. While higher defaults could improve throughput, such a change is not the goal of this KIP. We defer this discussion to a future proposal.

ProduceRequest / ProduceResopnse

Add a new field MaxIdempotenceBatchesToRetain to reflect broker configuration max.idempotence.batches.to.retain in ProduceResponse.

ProduceRequest
 {
   "apiKey": 0,
   "type": "request",
   "listeners": ["broker"],
   "name": "ProduceRequest",
   // ...
-  "validVersions": "3-13",
+  // Version 14 is the same as version 13 (KIP-1269).
+  "validVersions": "3-14",
   // ...
 }


ProduceResponse
 {
   "apiKey": 0,
   "type": "response",
   "name": "ProduceResponse",
   // ...
-  "validVersions": "3-13",
+  // Versions 14 adds MaxIdempotenceBatchesToRetain as a tagged field (KIP-1269).
+  "validVersions": "3-14",
   "flexibleVersions": "9+",
   "fields": [
     // ...
     { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "10+", "taggedVersions": "10+", "tag": 0,
       "about": "Endpoints for all current-leaders enumerated in PartitionProduceResponses, with errors NOT_LEADER_OR_FOLLOWER.", "fields": [
       { "name": "NodeId", "type": "int32", "versions": "10+",
         "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."},
       { "name": "Host", "type": "string", "versions": "10+",
         "about": "The node's hostname." },
       { "name": "Port", "type": "int32", "versions": "10+",
         "about": "The node's port." },
       { "name": "Rack", "type": "string", "versions": "10+", "nullableVersions": "10+", "default": "null",
         "about": "The rack of the node, or null if it has not been assigned to a rack." }
-    ]}
+    ]},
+    { "name": "MaxIdempotenceBatchesToRetain", "type": "int32", "versions": "14+", "taggedVersions": "14+", "tag": 1, "default"
: "5",
+      "about": "The maximum number of idempotence batches the broker retains in memory for a producer to a topic partition."
+    }
   ]
 }


Proposed Changes

Dynamic Capacity Discovery

Initial State (Safety First): When a connection is first established, the produce sends at most min(5, max.in.flight.requests.per.connection) for each node. This ensures safety with any broker version. Old brokers hardcode NUM_BATCHES_TO_RETAIN to 5. To ensure compatibility, the producer initially limits in-flight requests to min(5, max.in.flight.requests.per.connection) until it discovers the broker's actual capability via the ProduceResponse. This prevents sending excessive in-flight requests to older brokers that cannot handle them.

Discovery: Upon receiving the first ProduceResponse from a partition leader, the producer checks for the presence of the MaxIdempotenceBatchesToRetain tagged field.

Adaption:

  • If the field is present, the Producer updates its local limitation for that specific node connection to min(max.in.flight.requests.per.connection, MaxIdempotenceBatchesToRetain).

  • If the field is missing (older broker), the Producer uses limitation as initial state.

Compatibility, Deprecation, and Migration Plan

New broker with old producer

The minimal value of max.idempotence.batches.to.retain is 5. Even if the old producer sets max.in.flight.requests.per.connection up to 5, it doesn’t break the broker configuration.

Old broker with new producer

The default value of MaxIdempotenceBatchesToRetain is 5, so it works as same constraint in old brokers.


Test Plan

  • Unit test

    • Validate new configuration.

    • Validate producer can learn MaxIdempotenceBatchesToRetain from ProduceResponse and align with the broker.

  • Integration test

    • Producer with max.in.flight.requests.per.connection=1 and broker with max.idempotence.batches.to.retain=20: producer sends at most 1 in-flight request.

    • Producer with max.in.flight.requests.per.connection=10 and broker with max.idempotence.batches.to.retain=5: producer still sends at most 5 in-flight request.

Rejected Alternatives

Hardcoded Increase (e.g., to 20)

  • Proposal: Just bump the hardcoded NUM_BATCHES_TO_RETAIN to 20 in the broker code.

  • Rejection Reason: While memory usage is generally low, Kafka is used in diverse environments. Forcing a 4x increase in per-producer memory overhead for all users—including those who don't need high BDP throughput—is not acceptable. It must be opt-in/configurable.

Return MaxIdempotenceBatchesToRetain via InitProducerIdResponse

  • Proposal: Return the limit in the InitProducerIdResponse instead of ProduceResponse.

  • Rejection Reason: Different brokers can have different max.idempotence.batches.to.retain. After getting a PID, the producer can communicate with various partition leader. In this design, the producer doesn't have chance to learn the limitation on different brokers.


  • No labels