Status

Current state: Voting in progress

Discussion thread: https://lists.apache.org/thread/vkoqpq10xmvvzknfn65r394d577yogrk

Vote thread: https://lists.apache.org/thread/dko0pykc8wdbwp0mzs2h2jj2zpg9vpty

JIRA: KAFKA-17967 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, when a Kafka producer is configured for idempotence (enable.idempotence=true), the configuration max.in.flight.requests.per.connection should be less than or equal to 5. This limitation exists because the Kafka broker hardcodes the number of distinct batch sequence numbers it retains in memory for deduplication (ProducerStateEntry#NUM_BATCHES_TO_RETAIN).

To guarantee idempotent behavior and message ordering, the number of in-flight requests from the producer must not exceed the broker’s ability to track the history of recent sequence numbers. If a producer attempts to send more concurrent requests than the broker tracks, the broker cannot effectively validate sequence gaps or duplicates in the event of out-of-order delivery, leading to OutOfOrderSequenceException or potential ordering violations.

To fully utilize the available bandwidth, the TCP window must be filled. However, the application-level limitation of 5 in-flight requests prevents the producer from filling the pipeline.

$$\text{Throughput} \approx \frac{\text{Request Size} \times \text{In Flight Requests}}{\text{Network RTT}}$$

In scenarios like cross-region requests, hybrid cloud architectures, or multi-availability zone with higher internal latency, the network's Bandwidth-Delay Product (BDP) is high. As Round-Trip Time (RTT) increases and throughput drops linearly because In Flight Requests is artificially capped at 5.

The original limit of 5 was chosen to conserve broker heap memory. However, The memory overhead for tracking sequence numbers is minimal. A BatchMetadata object (tracking sequence/offset) is around 36 bytes.

  • Current State: 5 batches * 36 bytes = 180 bytes per Producer ID (PID).

  • Proposed (e.g., 20 batches): 20 batches * 36 bytes = ~720 bytes per PID.

Even with 10,000 active producers on a single broker, the overhead increases from ~1.8 MB to ~7.2 MB. The modern Kafka deployments often run on instances with significant memory resources. The memory overhead is negligible for many operators compared to the performance penalty incurred by the current throughput cap.

We propose making the broker-side deduplication buffer size configurable (or dynamic). This will allow operators to tune their clusters for high-latency environments by increasing the allowed max.in.flight.requests.per.connection for idempotent/transactional producers. This change will enable users to maintain exactly-once semantics and strict ordering without accepting a massive degradation in throughput over long-distance links.

Following is throughput difference among 1, 5, 10 max.in.flight.requests.per.connection. The first table uses a server in AWS us-east-1 and a producer in AWS ap-northeast-1.

max.in.flight.requests.per.connectionthroughputavg latencymax latency50th latency95th latency99th latency99.9th latency
119.623234 records/sec25093.64 ms49788.00 ms25193 ms47286 ms49341 ms49788 ms
595.129376 records/sec5111.89 ms10096.00 ms5135 ms9644 ms10095 ms10096 ms
10 (temporary change hard code value in producer and broker)183.083120 records/sec2558.52 ms5036.00 ms2528 ms4749 ms5034 ms5036 ms

The second table uses server and producer via localhost. As the result, more in-flight requests still improve throughput in low latency environment.

./bin/kafka-producer-perf-test.sh --topic quickstart-events --num-records 10000000 --throughput 1000000000 --record-size 1000 --transactional-id thisistest --bootstrap-server 127.0.0.1:9092 --command-property max.in.flight.requests.per.connection=x

max.in.flight.requests.per.connectionthroughputavg latencymax latency50th latency95th latency99th latency99.9th latency
1198562.408165 records/sec126.01 ms464.00 ms147 ms171 ms260 ms430 ms
5285087.094107 records/sec2.81 ms127.00 ms0 ms14 ms68 ms108 ms
10 (temporary change hard code value in producer and broker)307238.540002 records/sec2.14 ms76.00 ms0 ms8 ms45 ms73 ms

Public Interfaces

Topic Configuration

  • Config name: producer.state.batches.to.retain
  • Server default config name: log.producer.state.batches.to.retain
  • Config type: Int
  • Default value: 5
  • Constraint: at least 5

Introduce a new topic configuration producer.state.batches.to.retain, with a server default config log.producer.state.batches.to.retain. This configuration controls how many produce request metadata entries the broker retains per partition to support idempotent and transactional deduplication. Operators can set the server default on the broker to apply a cluster-wide baseline, or override it per topic to tune individual topics independently. This configuration only takes effect for idempotent/transactional producers. To ensure backward compatibility, producer.state.batches.to.retain must have a minimum constraint of 5. This preserves the behavior of existing producers, which assume the broker always retains at least 5 batch metadata entries.

This KIP preserves the existing default of 5 for both producer.state.batches.to.retain and max.in.flight.requests.per.connection. While higher defaults could improve throughput, such a change is not the goal of this KIP. We defer this discussion to a future proposal.

Producer Configuration

max.in.flight.requests.per.connection

Prior to this KIP, the Kafka producer client enforces a hard limit of 5 on max.in.flight.requests.per.connection when idempotence or transactions are enabled, because the broker hardcodes its deduplication window to 5 entries. Setting a higher value causes a ConfigException at producer initialization. This KIP removes that restriction. With the broker's deduplication window now configurable and discoverable per partition via ProduceResponse, the producer no longer needs a static upper bound enforced at startup. The max.in.flight.requests.per.connection reverts to its original role: a pure connection-level capacity on the total number of in-flight requests to a broker node, regardless of whether the producer is idempotent or transactional. The per-partition in-flight check described below takes over responsibility for ensuring the producer does not exceed the broker's deduplication window for any individual partition.

ProduceRequest / ProduceResopnse

Add a new field ProducerStateBatchesToRetain to reflect topic partition configuration producer.state.batches.to.retain in ProduceResponse.

ProduceRequest
 {
   "apiKey": 0,
   "type": "request",
   "listeners": ["broker"],
   "name": "ProduceRequest",
   // ...
-  "validVersions": "3-13",
+  // Version 14 is the same as version 13 (KIP-1269).
+  "validVersions": "3-14",
   // ...
 }


ProduceResponse
 {
   "apiKey": 0,
   "type": "response",
   "name": "ProduceResponse",
   // ...
-  "validVersions": "3-13",
+  // Versions 14 adds ProducerStateBatchesToRetain as a tagged field (KIP-1269).
+  "validVersions": "3-14",
   "flexibleVersions": "9+",
   "fields": [
    { "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",
      "about": "Each produce response.", "fields": [
      { "name": "Name", "type": "string", "versions": "0-12", "entityType": "topicName", "mapKey": true, "ignorable": true,
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "13+", "mapKey": true, "ignorable": true, "about": "The unique topic ID" },
      // ...
  -   ]}
  +   ]},
  +   { "name": "ProducerStateBatchesToRetain", "type": "int32",
  +     "versions": "14+", "taggedVersions": "14+", "tag": 1, "default": "5",
  +     "about": "The maximum number of idempotent batches to retain in the broker." }
    ]},
    // ...
   ]
 }


Proposed Changes

Two-Level In-Flight Check

To support per-topic deduplication window sizes, the producer enforces two independent in-flight checks before sending a batch. The first is a per-partition check: for each topic partition, the producer tracks the number of batches currently in-flight to that partition and ensures it does not exceed the partition's discovered ProducerStateBatchesToRetain limit. This enforces the broker's deduplication window on a per-partition basis, so that a topic configured with a larger window does not interfere with a topic configured with a smaller one. The second is the existing per-connection check: the total number of in-flight requests to a broker node must not exceed max.in.flight.requests.per.connection. Both checks must pass before a batch is sent. The per-partition check is the new constraint introduced by this KIP; the per-connection check is unchanged from existing behavior. The per-partition limit is initially 5 and is updated dynamically as the producer receives ProduceResponse messages, as described in the Dynamic Capacity Discovery section below.

Dynamic Capacity Discovery

Initial State (Safety First): When a connection is first established, the per-partition in-flight limit defaults to 5 for all partitions on that node. This ensures safe behavior with any broker version, since old brokers hardcode their deduplication window to 5 entries. The per-connection limit remains max.in.flight.requests.per.connection as configured by the user.

Discovery: Upon receiving a ProduceResponse from a partition leader, the producer reads the ProducerStateBatchesToRetain tagged field from each PartitionProduceResponse. This value reflects the producer.state.batches.to.retain configuration of that specific topic partition on the leader broker.

Adaptation:

  • If the field is present, the producer updates the per-partition in-flight limit for that partition to the received ProducerStateBatchesToRetain value.
  • If the field is absent, the producer retains the default per-partition limit of 5 for all partitions on that node.

Leader Change: When the producer detects a partition leader change, the producer resets the per-partition in-flight limit for that partition back to 5 and re-enters the discovery process with the new leader. This is necessary because the new leader may be an old broker that does not recognize producer.state.batches.to.retain and hardcodes its deduplication window to 5. Resetting to 5 on every leader change ensures correctness regardless of whether the new leader is an upgraded or non-upgraded broker, and the producer will rediscover the correct limit from the new leader's first ProduceResponse.

Compatibility, Deprecation, and Migration Plan

New topic configuration with old broker

The producer.state.batches.to.retain has no effect on old brokers. Old brokers do not recognize this topic configuration and will continue to use their hardcoded deduplication window of 5. Even if an operator sets producer.state.batches.to.retain to a value greater than 5 on a topic, the old broker ignores it and the ProducerStateBatchesToRetain field will remain default value in PartitionProduceResponse. The new producer will therefore fall back to the default per-partition in-flight limit of 5 for all partitions on that node, as described in the Dynamic Capacity Discovery section. Operators should ensure all brokers in the cluster are upgraded before relying on values greater than 5.

New broker with old producer

The minimum value of producer.state.batches.to.retain is 5, which matches the deduplication window old producers assume. Old producers are still subject to the existing ConfigException at initialization if max.in.flight.requests.per.connection is set greater than 5 with idempotence or transactions enabled. Their behavior on the broker side is unchanged.

Old broker with new producer

When a new producer connects to an old broker, the old broker doesn't set the ProducerStateBatchesToRetain in ProduceResponse. The default value is 5, so the new producer retains the default per-partition in-flight limit of 5 for all partitions on that node, which matches the old broker's hardcoded deduplication window. The removal of the startup ConfigException for idempotent producers still takes effect on the producer side, but since the discovered per-partition limit defaults to 5, the producer will not send more than 5 in-flight batches per partition to an old broker, preserving correctness.


Test Plan

  • Unit test

    • Validate new configuration - default/minimal value.

    • Validate producer can learn ProducerStateBatchesToRetain from ProduceResponse and align with the partition leader.

    • Validate that the producer resets the per-partition in-flight limit to 5 upon detecting a partition leader change, and rediscovers the limit from the new leader's first ProduceResponse.
    • Validate that idempotent and transactional producers no longer throw ConfigException when max.in.flight.requests.per.connection is set greater than 5.
  • Integration test

    • Producer with max.in.flight.requests.per.connection=1 and topic with producer.state.batches.to.retain=20: producer sends at most 1 in-flight request per partition, bounded by the connection-level check.
    • Producer with max.in.flight.requests.per.connection=20 and topic with producer.state.batches.to.retain=5: producer sends at most 5 in-flight batches per partition, bounded by the per-partition check.
    • Producer with max.in.flight.requests.per.connection=20 and two topics on the same broker, one with producer.state.batches.to.retain=5 and another with producer.state.batches.to.retain=20: producer respects each partition's limit independently without one topic affecting the other.
    • Producer connected to an old broker with producer.state.batches.to.retain=20 set on the topic: producer falls back to a per-partition limit of 5 since the old broker does not return ProducerStateBatchesToRetain in PartitionProduceResponse.
    • Upon partition leader change from a new broker to an old broker: producer resets the per-partition limit to 5 and does not exceed the old broker's hardcoded deduplication window.

Rejected Alternatives

Hardcoded Increase (e.g., to 20)

  • Proposal: Just bump the hardcoded NUM_BATCHES_TO_RETAIN to 20 in the broker code.

  • Rejection Reason: While memory usage is generally low, Kafka is used in diverse environments. Forcing a 4x increase in per-producer memory overhead for all users—including those who don't need high BDP throughput—is not acceptable. It must be opt-in/configurable.

Return ProducerStateBatchesToRetain via InitProducerIdResponse

  • Proposal: Return the limit in the InitProducerIdResponse instead of ProduceResponse.

  • Rejection Reason: Different brokers can have different producer.state.batches.to.retain. After getting a PID, the producer can communicate with various partition leader. In this design, the producer doesn't have chance to learn the limitation on different brokers.


  • No labels